Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid potential proxycfg/xDS deadlock using non-blocking send #9689

Merged
merged 7 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,11 @@ func (s *state) run() {
case <-s.ctx.Done():
return
case u := <-s.ch:
s.logger.Trace("A blocking query returned; handling snapshot update")

if err := s.handleUpdate(u, &snap); err != nil {
s.logger.Error("watch error",
"id", u.CorrelationID,
"error", err,
s.logger.Error("Failed to handle update from watch",
"id", u.CorrelationID, "error", err,
)
continue
}
Expand All @@ -617,31 +618,52 @@ func (s *state) run() {
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"proxy", s.proxyID,
"error", err,
)
continue
}
s.snapCh <- *snapCopy
// Allow the next change to trigger a send
coalesceTimer = nil

// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue
select {
// try to send
case s.snapCh <- *snapCopy:
s.logger.Trace("Delivered new snapshot to proxy config watchers")

// Allow the next change to trigger a send
coalesceTimer = nil

// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue

// avoid blocking if a snapshot is already buffered, but queue up a retry with a timer
default:
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")

if coalesceTimer == nil {
freddygv marked this conversation as resolved.
Show resolved Hide resolved
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
sendCh <- struct{}{}
})
}

// Do not reset coalesceTimer since we just queued a timer-based refresh
continue
}

case replyCh := <-s.reqCh:
s.logger.Trace("A proxy config snapshot was requested")

if !snap.Valid() {
// Not valid yet just respond with nil and move on to next task.
replyCh <- nil

s.logger.Trace("The proxy's config snapshot is not valid yet")
continue
}
// Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates.
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"proxy", s.proxyID,
"error", err,
)
continue
Expand Down
12 changes: 12 additions & 0 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/consul/logging"
"sync/atomic"
"time"

Expand Down Expand Up @@ -164,6 +165,8 @@ const (
)

func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error {
logger := s.Logger.Named(logging.XDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment here about using With

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that would work here since the proxy ID isn't known until the xDS server state machine is running


// xDS requires a unique nonce to correlate response/request pairs
var nonce uint64

Expand Down Expand Up @@ -324,6 +327,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// state machine.
defer watchCancel()

logger.Trace("watching proxy, pending initial proxycfg snapshot",
"service_id", proxyID.String())

// Now wait for the config so we can check ACL
state = statePendingInitialConfig
case statePendingInitialConfig:
Expand All @@ -335,6 +341,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// Got config, try to authenticate next.
state = stateRunning

logger.Trace("Got initial config snapshot",
"service_id", cfgSnap.ProxyID.String())

// Lets actually process the config we just got or we'll mis responding
fallthrough
case stateRunning:
Expand All @@ -346,6 +355,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// timer is first started.
extendAuthTimer()

logger.Trace("Invoking all xDS resource handlers and sending new data if there is any",
"service_id", cfgSnap.ProxyID.String())

// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
// range the map which has no determined order. It's important because:
Expand Down
1 change: 1 addition & 0 deletions logging/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ const (
UIMetricsProxy string = "ui_metrics_proxy"
WAN string = "wan"
Watch string = "watch"
XDS string = "xds"
Vault string = "vault"
)