From 37190c0d0d732e20edef847d5ccf1deb9be187c7 Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 2 Feb 2021 11:31:14 -0700 Subject: [PATCH 1/7] Avoid potential deadlock using non-blocking send Deadlock scenario: 1. Due to scheduling, the state runner sends one snapshot into snapCh and then attempts to send a second. The first send succeeds because the channel is buffered, but the second blocks. 2. Separately, Manager.Watch is called by the xDS server after getting a discovery request from Envoy. This function acquires the manager lock and then blocks on receiving the CurrentSnapshot from the state runner. 3. Separately, there is a Manager goroutine that reads the snapshots from the channel in step 1. These reads are done to notify proxy watchers, but they require holding the manager lock. This goroutine goes to acquire that lock, but can't because it is held by step 2. Now, the goroutine from step 3 is waiting on the one from step 2 to release the lock. The goroutine from step 2 won't release the lock until the goroutine in step 1 advances. But the goroutine in step 1 is waiting for the one in step 3. Deadlock. By making this send non-blocking step 1 above can proceed. The coalesce timer will be reset and a new valid snapshot will be delivered after it elapses or when one is requested by xDS. --- agent/proxycfg/state.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 703cb66afcdc..59968e83e6a4 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -622,7 +622,14 @@ func (s *state) run() { ) continue } - s.snapCh <- *snapCopy + + select { + case s.snapCh <- *snapCopy: + // try to send + default: + // avoid blocking if a snapshot is already buffered + } + // Allow the next change to trigger a send coalesceTimer = nil From 5ba14ad41df93ec7f0b3705b247016980a8dd215 Mon Sep 17 00:00:00 2001 From: freddygv Date: Tue, 2 Feb 2021 12:26:38 -0700 Subject: [PATCH 2/7] Add trace logs to proxycfg state runner and xds srv --- agent/proxycfg/state.go | 40 +++++++++++++++++++++++++++++----------- agent/xds/server.go | 12 ++++++++++++ logging/names.go | 1 + 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 59968e83e6a4..fd9bdc802c70 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -583,6 +583,8 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot { } func (s *state) run() { + logger := s.logger.Named(logging.ProxyConfig) + // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it @@ -603,10 +605,13 @@ func (s *state) run() { case <-s.ctx.Done(): return case u := <-s.ch: + logger.Trace("A blocking query returned; handling snapshot update", + "proxy-id", s.proxyID.String(), + ) + if err := s.handleUpdate(u, &snap); err != nil { - s.logger.Error("watch error", - "id", u.CorrelationID, - "error", err, + logger.Error("Failed to handle update from watch", + "id", u.CorrelationID, "error", err, ) continue } @@ -616,18 +621,24 @@ func (s *state) run() { // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - s.logger.Error("Failed to copy config snapshot for proxy", - "proxy", s.proxyID, - "error", err, + logger.Error("Failed to copy config snapshot for proxy", + "proxy-id", s.proxyID.String(), "error", err, ) continue } select { + // try to send case s.snapCh <- *snapCopy: - // try to send + logger.Trace("Delivered new snapshot to proxy config watchers", + "proxy-id", s.proxyID.String(), + ) + + // avoid blocking if a snapshot is already buffered default: - // avoid blocking if a snapshot is already buffered + logger.Trace("Failed to deliver new snapshot to proxy config watchers", + "proxy-id", s.proxyID.String(), + ) } // Allow the next change to trigger a send @@ -638,18 +649,25 @@ func (s *state) run() { continue case replyCh := <-s.reqCh: + logger.Trace("A proxy config snapshot was requested", + "proxy-id", s.proxyID.String(), + ) + if !snap.Valid() { // Not valid yet just respond with nil and move on to next task. replyCh <- nil + + logger.Trace("The proxy's config snapshot is not valid yet", + "proxy-id", s.proxyID.String(), + ) continue } // Make a deep copy of snap so we don't mutate any of the embedded structs // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - s.logger.Error("Failed to copy config snapshot for proxy", - "proxy", s.proxyID, - "error", err, + logger.Error("Failed to copy config snapshot for proxy", + "proxy-id", s.proxyID.String(), "error", err, ) continue } diff --git a/agent/xds/server.go b/agent/xds/server.go index d8a7ecbe5109..b2506a753c73 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/consul/logging" "sync/atomic" "time" @@ -164,6 +165,8 @@ const ( ) func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error { + logger := s.Logger.Named(logging.XDS) + // xDS requires a unique nonce to correlate response/request pairs var nonce uint64 @@ -324,6 +327,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // state machine. defer watchCancel() + logger.Trace("watching proxy, pending initial proxycfg snapshot", + "proxy-id", proxyID.String()) + // Now wait for the config so we can check ACL state = statePendingInitialConfig case statePendingInitialConfig: @@ -335,6 +341,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // Got config, try to authenticate next. state = stateRunning + logger.Trace("Got initial config snapshot", + "proxy-id", cfgSnap.ProxyID.String()) + // Lets actually process the config we just got or we'll mis responding fallthrough case stateRunning: @@ -346,6 +355,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) // timer is first started. extendAuthTimer() + logger.Trace("Invoking all xDS resource handlers and sending new data if there is any", + "proxy-id", cfgSnap.ProxyID.String()) + // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just // range the map which has no determined order. It's important because: diff --git a/logging/names.go b/logging/names.go index 54499006153c..b9616b167a43 100644 --- a/logging/names.go +++ b/logging/names.go @@ -56,5 +56,6 @@ const ( UIMetricsProxy string = "ui_metrics_proxy" WAN string = "wan" Watch string = "watch" + XDS string = "xds" Vault string = "vault" ) From 95e7641faa78784d5f71eb2c9150b4bb9510e91e Mon Sep 17 00:00:00 2001 From: freddygv Date: Fri, 5 Feb 2021 15:14:49 -0700 Subject: [PATCH 3/7] Update proxycfg logging, labels were already attached --- agent/proxycfg/state.go | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index fd9bdc802c70..fcb0302eb415 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -583,8 +583,6 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot { } func (s *state) run() { - logger := s.logger.Named(logging.ProxyConfig) - // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it @@ -605,12 +603,10 @@ func (s *state) run() { case <-s.ctx.Done(): return case u := <-s.ch: - logger.Trace("A blocking query returned; handling snapshot update", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("A blocking query returned; handling snapshot update") if err := s.handleUpdate(u, &snap); err != nil { - logger.Error("Failed to handle update from watch", + s.logger.Error("Failed to handle update from watch", "id", u.CorrelationID, "error", err, ) continue @@ -621,8 +617,8 @@ func (s *state) run() { // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - logger.Error("Failed to copy config snapshot for proxy", - "proxy-id", s.proxyID.String(), "error", err, + s.logger.Error("Failed to copy config snapshot for proxy", + "error", err, ) continue } @@ -630,15 +626,11 @@ func (s *state) run() { select { // try to send case s.snapCh <- *snapCopy: - logger.Trace("Delivered new snapshot to proxy config watchers", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("Delivered new snapshot to proxy config watchers") // avoid blocking if a snapshot is already buffered default: - logger.Trace("Failed to deliver new snapshot to proxy config watchers", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") } // Allow the next change to trigger a send @@ -649,25 +641,21 @@ func (s *state) run() { continue case replyCh := <-s.reqCh: - logger.Trace("A proxy config snapshot was requested", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("A proxy config snapshot was requested") if !snap.Valid() { // Not valid yet just respond with nil and move on to next task. replyCh <- nil - logger.Trace("The proxy's config snapshot is not valid yet", - "proxy-id", s.proxyID.String(), - ) + s.logger.Trace("The proxy's config snapshot is not valid yet") continue } // Make a deep copy of snap so we don't mutate any of the embedded structs // etc on future updates. snapCopy, err := snap.Clone() if err != nil { - logger.Error("Failed to copy config snapshot for proxy", - "proxy-id", s.proxyID.String(), "error", err, + s.logger.Error("Failed to copy config snapshot for proxy", + "error", err, ) continue } From de0cb1af7fe1188aac5fec554a312546be3e25fe Mon Sep 17 00:00:00 2001 From: freddygv Date: Fri, 5 Feb 2021 15:15:52 -0700 Subject: [PATCH 4/7] Make xDS labeling consistent with proxycfg --- agent/xds/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/xds/server.go b/agent/xds/server.go index b2506a753c73..7986b80150a7 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -328,7 +328,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) defer watchCancel() logger.Trace("watching proxy, pending initial proxycfg snapshot", - "proxy-id", proxyID.String()) + "service_id", proxyID.String()) // Now wait for the config so we can check ACL state = statePendingInitialConfig @@ -342,7 +342,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) state = stateRunning logger.Trace("Got initial config snapshot", - "proxy-id", cfgSnap.ProxyID.String()) + "service_id", cfgSnap.ProxyID.String()) // Lets actually process the config we just got or we'll mis responding fallthrough @@ -356,7 +356,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) extendAuthTimer() logger.Trace("Invoking all xDS resource handlers and sending new data if there is any", - "proxy-id", cfgSnap.ProxyID.String()) + "service_id", cfgSnap.ProxyID.String()) // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just From 6e443e55365caca08b80f156fd709e494f7d47ce Mon Sep 17 00:00:00 2001 From: freddygv Date: Fri, 5 Feb 2021 18:00:59 -0700 Subject: [PATCH 5/7] Retry send after timer fires, in case no updates occur --- agent/proxycfg/state.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index fcb0302eb415..ad1ef61ecf7d 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -628,17 +628,26 @@ func (s *state) run() { case s.snapCh <- *snapCopy: s.logger.Trace("Delivered new snapshot to proxy config watchers") - // avoid blocking if a snapshot is already buffered + // Allow the next change to trigger a send + coalesceTimer = nil + + // Skip rest of loop - there is nothing to send since nothing changed on + // this iteration + continue + + // avoid blocking if a snapshot is already buffered, but queue up a retry with a timer default: s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") - } - // Allow the next change to trigger a send - coalesceTimer = nil + if coalesceTimer == nil { + coalesceTimer = time.AfterFunc(coalesceTimeout, func() { + sendCh <- struct{}{} + }) + } - // Skip rest of loop - there is nothing to send since nothing changed on - // this iteration - continue + // Do not reset coalesceTimer since we just queued a timer-based refresh + continue + } case replyCh := <-s.reqCh: s.logger.Trace("A proxy config snapshot was requested") From ec5f75776bf8c98df085c6bdfb362f343648d2a2 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 8 Feb 2021 09:45:45 -0700 Subject: [PATCH 6/7] Update comments on avoiding proxycfg deadlock --- agent/proxycfg/state.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index ad1ef61ecf7d..59d11a2f1195 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -624,7 +624,7 @@ func (s *state) run() { } select { - // try to send + // Try to send case s.snapCh <- *snapCopy: s.logger.Trace("Delivered new snapshot to proxy config watchers") @@ -635,10 +635,12 @@ func (s *state) run() { // this iteration continue - // avoid blocking if a snapshot is already buffered, but queue up a retry with a timer + // Avoid blocking if a snapshot is already buffered in snapCh as this can result in a deadlock. + // See PR #9689 for more details. default: s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") + // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. if coalesceTimer == nil { coalesceTimer = time.AfterFunc(coalesceTimeout, func() { sendCh <- struct{}{} From 5dca98e870f6b7d61c4cd318fa9134488d1c1cac Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 8 Feb 2021 09:45:58 -0700 Subject: [PATCH 7/7] Add changelog entry --- .changelog/9689.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/9689.txt diff --git a/.changelog/9689.txt b/.changelog/9689.txt new file mode 100644 index 000000000000..85f78ac90e98 --- /dev/null +++ b/.changelog/9689.txt @@ -0,0 +1,3 @@ +```release-note:bug +proxycfg: avoid potential deadlock in delivering proxy snapshot to watchers. +``` \ No newline at end of file