diff --git a/.changelog/11924.txt b/.changelog/11924.txt new file mode 100644 index 000000000000..d76445017eaf --- /dev/null +++ b/.changelog/11924.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix a deadlock when the snapshot channel already have a snapshot to be consumed. +``` diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index c8e2ab8f2b1b..ce2a59c48792 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -125,7 +125,7 @@ func (m *Manager) Run() error { defer m.State.StopNotify(stateCh) for { - m.syncState() + m.syncState(m.notifyBroadcast) // Wait for a state change _, ok := <-stateCh @@ -138,7 +138,7 @@ func (m *Manager) Run() error { // syncState is called whenever the local state notifies a change. It holds the // lock while finding any new or updated proxies and removing deleted ones. -func (m *Manager) syncState() { +func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) { m.mu.Lock() defer m.mu.Unlock() @@ -158,7 +158,7 @@ func (m *Manager) syncState() { // know that so we'd need to set it here if not during registration of the // proxy service. Sidecar Service in the interim can do that, but we should // validate more generally that that is always true. - err := m.ensureProxyServiceLocked(svc) + err := m.ensureProxyServiceLocked(svc, notifyBroadcast) if err != nil { m.Logger.Error("failed to watch proxy service", "service", sid.String(), @@ -177,7 +177,7 @@ func (m *Manager) syncState() { } // ensureProxyServiceLocked adds or changes the proxy to our state. -func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error { +func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error { sid := ns.CompoundServiceID() // Retrieve the token used to register the service, or fallback to the @@ -222,16 +222,18 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error { m.proxies[sid] = state // Start a goroutine that will wait for changes and broadcast them to watchers. - go func(ch <-chan ConfigSnapshot) { - // Run until ch is closed - for snap := range ch { - m.notify(&snap) - } - }(ch) + go notifyBroadcast(ch) return nil } +func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) { + // Run until ch is closed + for snap := range ch { + m.notify(&snap) + } +} + // removeProxyService is called when a service deregisters and frees all // resources for that service. func (m *Manager) removeProxyServiceLocked(proxyID structs.ServiceID) { diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 87175c91bdca..2bebeec43511 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -567,7 +567,146 @@ func TestManager_SyncState_DefaultToken(t *testing.T) { err = state.AddServiceWithChecks(srv, nil, "") require.NoError(t, err) - m.syncState() + m.syncState(m.notifyBroadcast) require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].token) } + +func TestManager_SyncState_No_Notify(t *testing.T) { + types := NewTestCacheTypes(t) + c := TestCacheWithTypes(t, types) + logger := testutil.Logger(t) + tokens := new(token.Store) + tokens.UpdateUserToken("default-token", token.TokenSourceConfig) + + state := local.NewState(local.Config{}, logger, tokens) + state.TriggerSyncChanges = func() {} + + m, err := NewManager(ManagerConfig{ + Cache: c, + Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + State: state, + Tokens: tokens, + Source: &structs.QuerySource{Datacenter: "dc1"}, + Logger: logger, + }) + require.NoError(t, err) + defer m.Close() + + srv := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 9999, + Meta: map[string]string{}, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceID: "web", + DestinationServiceName: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8080, + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + } + + err = state.AddServiceWithChecks(srv, nil, "") + require.NoError(t, err) + + readEvent := make(chan bool, 1) + snapSent := make(chan bool, 1) + + m.syncState(func(ch <-chan ConfigSnapshot) { + for { + <-readEvent + snap := <-ch + m.notify(&snap) + snapSent <- true + } + }) + + // Get the relevant notification Channel, should only have 1 + notifyCH := m.proxies[srv.CompoundServiceID()].ch + + // update the leaf certs + roots, issuedCert := TestCerts(t) + notifyCH <- cache.UpdateEvent{ + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + } + // at this point the snapshot should not be valid and not be sent + after := time.After(200 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be valid") + case <-after: + + } + + // update the root certs + notifyCH <- cache.UpdateEvent{ + CorrelationID: rootsWatchID, + Result: roots, + Err: nil, + } + + // at this point the snapshot should not be valid and not be sent + after = time.After(200 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be valid") + case <-after: + + } + + // prepare to read a snapshot update as the next update should make the snapshot valid + readEvent <- true + + // update the intentions + notifyCH <- cache.UpdateEvent{ + CorrelationID: intentionsWatchID, + Result: &structs.IndexedIntentionMatches{}, + Err: nil, + } + + // at this point we have a valid snapshot + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + case <-after: + t.Fatal("snap should be valid") + + } + + // send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case + for i := 0; i < 2; i++ { + time.Sleep(250 * time.Millisecond) + notifyCH <- cache.UpdateEvent{ + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + } + } + + // make sure that we are not receiving any snapshot and wait for the snapshots to be processed + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + t.Fatal("snap should not be sent") + case <-after: + } + + // now make sure that both snapshots got propagated + for i := 0; i < 2; i++ { + + readEvent <- true + after = time.After(500 * time.Millisecond) + select { + case <-snapSent: + case <-after: + t.Fatal("snap should be valid") + + } + } +} diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 59d11a2f1195..d2f87236c6f4 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -613,6 +613,8 @@ func (s *state) run() { } case <-sendCh: + // Allow the next change to trigger a send + coalesceTimer = nil // Make a deep copy of snap so we don't mutate any of the embedded structs // etc on future updates. snapCopy, err := snap.Clone() @@ -628,9 +630,6 @@ func (s *state) run() { case s.snapCh <- *snapCopy: s.logger.Trace("Delivered new snapshot to proxy config watchers") - // Allow the next change to trigger a send - coalesceTimer = nil - // Skip rest of loop - there is nothing to send since nothing changed on // this iteration continue @@ -641,11 +640,9 @@ func (s *state) run() { s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. - if coalesceTimer == nil { - coalesceTimer = time.AfterFunc(coalesceTimeout, func() { - sendCh <- struct{}{} - }) - } + coalesceTimer = time.AfterFunc(coalesceTimeout, func() { + sendCh <- struct{}{} + }) // Do not reset coalesceTimer since we just queued a timer-based refresh continue