From 3d43c08db9746e87bf84b1f35e24b41ae4286001 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 28 Dec 2021 23:23:44 -0500 Subject: [PATCH 1/6] reset `coalesceTimer` to nil as soon as the event is consumed --- agent/proxycfg/state.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 0b535d0e49be..db65a7df34b9 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -294,6 +294,8 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { } case <-sendCh: + // Allow the next change to trigger a send + coalesceTimer = nil // Make a deep copy of snap so we don't mutate any of the embedded structs // etc on future updates. snapCopy, err := snap.Clone() @@ -307,9 +309,6 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { case s.snapCh <- *snapCopy: s.logger.Trace("Delivered new snapshot to proxy config watchers") - // Allow the next change to trigger a send - coalesceTimer = nil - // Skip rest of loop - there is nothing to send since nothing changed on // this iteration continue From b046d8c8c639b253e54c53eb702caf0e57b242e6 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 28 Dec 2021 23:27:06 -0500 Subject: [PATCH 2/6] add change log --- .changelog/11924.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/11924.txt diff --git a/.changelog/11924.txt b/.changelog/11924.txt new file mode 100644 index 000000000000..d76445017eaf --- /dev/null +++ b/.changelog/11924.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix a deadlock when the snapshot channel already have a snapshot to be consumed. +``` From c6c37848812cbbcfe9a0670b8b9351722bea79bc Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 4 Jan 2022 11:42:15 -0500 Subject: [PATCH 3/6] refactor to add relevant test. --- agent/proxycfg/manager.go | 22 ++--- agent/proxycfg/manager_test.go | 145 ++++++++++++++++++++++++++++++++- 2 files changed, 156 insertions(+), 11 deletions(-) diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 083291c1371a..d5d102b1efd2 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -127,7 +127,7 @@ func (m *Manager) Run() error { defer m.State.StopNotify(stateCh) for { - m.syncState() + m.syncState(m.notifyBroadcast) // Wait for a state change _, ok := <-stateCh @@ -140,7 +140,7 @@ func (m *Manager) Run() error { // syncState is called whenever the local state notifies a change. It holds the // lock while finding any new or updated proxies and removing deleted ones. -func (m *Manager) syncState() { +func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) { m.mu.Lock() defer m.mu.Unlock() @@ -160,7 +160,7 @@ func (m *Manager) syncState() { // know that so we'd need to set it here if not during registration of the // proxy service. Sidecar Service in the interim can do that, but we should // validate more generally that that is always true. - err := m.ensureProxyServiceLocked(svc) + err := m.ensureProxyServiceLocked(svc, notifyBroadcast) if err != nil { m.Logger.Error("failed to watch proxy service", "service", sid.String(), @@ -179,7 +179,7 @@ func (m *Manager) syncState() { } // ensureProxyServiceLocked adds or changes the proxy to our state. -func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error { +func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error { sid := ns.CompoundServiceID() // Retrieve the token used to register the service, or fallback to the @@ -227,16 +227,18 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error { m.proxies[sid] = state // Start a goroutine that will wait for changes and broadcast them to watchers. - go func(ch <-chan ConfigSnapshot) { - // Run until ch is closed - for snap := range ch { - m.notify(&snap) - } - }(ch) + go notifyBroadcast(ch) return nil } +func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) { + // Run until ch is closed + for snap := range ch { + m.notify(&snap) + } +} + // removeProxyService is called when a service deregisters and frees all // resources for that service. func (m *Manager) removeProxyServiceLocked(proxyID structs.ServiceID) { diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 9e659b5f0df8..cfcf74ec47c9 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -598,7 +598,150 @@ func TestManager_SyncState_DefaultToken(t *testing.T) { err = state.AddServiceWithChecks(srv, nil, "") require.NoError(t, err) - m.syncState() + m.syncState(m.notifyBroadcast) require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].serviceInstance.token) } + +func TestManager_SyncState_No_Notify(t *testing.T) { + types := NewTestCacheTypes(t) + c := TestCacheWithTypes(t, types) + logger := testutil.Logger(t) + tokens := new(token.Store) + tokens.UpdateUserToken("default-token", token.TokenSourceConfig) + + state := local.NewState(local.Config{}, logger, tokens) + state.TriggerSyncChanges = func() {} + + m, err := NewManager(ManagerConfig{ + Cache: c, + Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + State: state, + Tokens: tokens, + Source: &structs.QuerySource{Datacenter: "dc1"}, + Logger: logger, + }) + require.NoError(t, err) + defer m.Close() + + srv := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 9999, + Meta: map[string]string{}, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceID: "web", + DestinationServiceName: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8080, + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + } + + err = state.AddServiceWithChecks(srv, nil, "") + require.NoError(t, err) + + readEvent := make(chan bool, 1) + snapSent := make(chan bool, 1) + + m.syncState(func(ch <-chan ConfigSnapshot) { + for { + <-readEvent + snap := <-ch + m.notify(&snap) + snapSent <- true + } + }) + + // Get the relevant notification Channel, should only have 1 + services := m.State.AllServices() + var notifyCH chan cache.UpdateEvent + + for sid, _ := range services { + notifyCH = m.proxies[sid].ch + } + + // update the leaf certs + roots, issuedCert := TestCerts(t) + notifyCH <- cache.UpdateEvent{ + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + } + // at this point the snapshot should not be valid and not be sent + after := time.After(200 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be valid") + case <-after: + + } + + // update the root certs + notifyCH <- cache.UpdateEvent{ + CorrelationID: rootsWatchID, + Result: roots, + Err: nil, + } + + // at this point the snapshot should not be valid and not be sent + after = time.After(200 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be valid") + case <-after: + + } + + //prepare to read a snapshot update as the next update should make the snapshot valid + readEvent <- true + // update the intensions + notifyCH <- cache.UpdateEvent{ + CorrelationID: intentionsWatchID, + Result: &structs.IndexedIntentionMatches{}, + Err: nil, + } + + // at this point we have a valid snapshot + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + case <-after: + t.Fatal("snap should be valid") + + } + + // send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case + for i := 0; i < 2; i++ { + time.Sleep(250 * time.Millisecond) + notifyCH <- cache.UpdateEvent{ + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + } + } + + // make sure that we are not receiving any snapshot and wait for the snapshots to be processed + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be sent") + case <-after: + } + + // now make sure that both snapshots got propagated + for i := 0; i < 2; i++ { + + readEvent <- true + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + case <-after: + t.Fatal("snap should be valid") + + } + } +} From f68b1a4a77741ae4c293571efd32b943971ec9ab Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 4 Jan 2022 11:58:07 -0500 Subject: [PATCH 4/6] fix linter --- agent/proxycfg/manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index cfcf74ec47c9..3be46e4e69c2 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -660,7 +660,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) { services := m.State.AllServices() var notifyCH chan cache.UpdateEvent - for sid, _ := range services { + for sid := range services { notifyCH = m.proxies[sid].ch } From 473cc8a2b6efe8f99072bc3d88ce741729ea486b Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 5 Jan 2022 10:28:36 -0500 Subject: [PATCH 5/6] Apply suggestions from code review Co-authored-by: Freddy --- agent/proxycfg/manager_test.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 3be46e4e69c2..cdb271ee354e 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -657,12 +657,7 @@ func TestManager_SyncState_No_Notify(t *testing.T) { }) // Get the relevant notification Channel, should only have 1 - services := m.State.AllServices() - var notifyCH chan cache.UpdateEvent - - for sid := range services { - notifyCH = m.proxies[sid].ch - } + notifyCH := m.proxies[srv.CompoundServiceID()].ch // update the leaf certs roots, issuedCert := TestCerts(t) @@ -696,9 +691,10 @@ func TestManager_SyncState_No_Notify(t *testing.T) { } - //prepare to read a snapshot update as the next update should make the snapshot valid + // prepare to read a snapshot update as the next update should make the snapshot valid readEvent <- true - // update the intensions + + // update the intentions notifyCH <- cache.UpdateEvent{ CorrelationID: intentionsWatchID, Result: &structs.IndexedIntentionMatches{}, From b1ce653bd957f62cff47a90d233886cc86ccba25 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 5 Jan 2022 10:29:24 -0500 Subject: [PATCH 6/6] remove non needed check --- agent/proxycfg/state.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index db65a7df34b9..586bc393868d 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -319,11 +319,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. - if coalesceTimer == nil { - coalesceTimer = time.AfterFunc(coalesceTimeout, func() { - sendCh <- struct{}{} - }) - } + coalesceTimer = time.AfterFunc(coalesceTimeout, func() { + sendCh <- struct{}{} + }) // Do not reset coalesceTimer since we just queued a timer-based refresh continue