Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid potential proxycfg/xDS deadlock using non-blocking send #9689

Merged
merged 7 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
}

func (s *state) run() {
logger := s.logger.Named(logging.ProxyConfig)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non blocking idea:

Use the With() call here and tack on the proxy-id logging kv here so that all messages emitted by the logger automatically get proxy-id attached

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, didn't know about .With(), done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the labeling was already taken care of:

Logger: a.logger.Named(logging.ProxyConfig),

state.logger = m.Logger.With("service_id", sid.String())

Removed it from here.


// Close the channel we return from Watch when we stop so consumers can stop
// watching and clean up their goroutines. It's important we do this here and
// not in Close since this routine sends on this chan and so might panic if it
Expand All @@ -603,10 +605,13 @@ func (s *state) run() {
case <-s.ctx.Done():
return
case u := <-s.ch:
logger.Trace("A blocking query returned; handling snapshot update",
"proxy-id", s.proxyID.String(),
)

if err := s.handleUpdate(u, &snap); err != nil {
s.logger.Error("watch error",
"id", u.CorrelationID,
"error", err,
logger.Error("Failed to handle update from watch",
"id", u.CorrelationID, "error", err,
)
continue
}
Expand All @@ -616,13 +621,26 @@ func (s *state) run() {
// etc on future updates.
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"proxy", s.proxyID,
"error", err,
logger.Error("Failed to copy config snapshot for proxy",
"proxy-id", s.proxyID.String(), "error", err,
)
continue
}
s.snapCh <- *snapCopy

select {
// try to send
case s.snapCh <- *snapCopy:
logger.Trace("Delivered new snapshot to proxy config watchers",
"proxy-id", s.proxyID.String(),
)

// avoid blocking if a snapshot is already buffered
default:
logger.Trace("Failed to deliver new snapshot to proxy config watchers",
"proxy-id", s.proxyID.String(),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how this breaks the deadlock, but is it OK? If we just drop the new snapshot delivery on the floor that means that some change in proxy config that happened since the chan was last filled is now never going to get applied to any proxies consuming this state (at least until something else changes).

Isn't that just making the deadlock into a silent (other than the trace log) delivery failure that won't automatically resolve?

I could understand if the chan were just a "something changed" edge signal for the proxy to reload to the current config state, but it's not it's delivering a concrete state and now if we do hit this race, we'll just drop some valid config change on the floor won't we?

Copy link
Member

@banks banks Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be correct if we instead of dropping the update entirely, we just scheduled a new delivery of it later (by sending on the sendCh again).

That would unblock us here, release the deadlock, but then in a few milliseconds we'd be back and would finally get another chance to deliver the latest snapshot instead of dropping it?

We might have to only push to sendCh if it wouldn't block, but I think this is OK because the sendCh is just an edge trigger that causes re-reading of the latest when we re-enter this case so we'd only need to send to sendCh if it wasn't already full and be sure that eventually the latest config will be delivered.

That said, I don't recall if sendCh is buffered, if not we'd not be able to send to it directly here as by definition nothing will be reading it as we are already in the loop that reads it... We'd either need to buffer it by 1 and then drop if full or spawn a new goroutine with time.AfterFunc or something but I think buffering is reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already scheduling a new delivery of it though.

  1. After dropping the send, here we're resetting the coalesceTimer
  2. In a following loop we fall into this block where we re-create the coalesceTimer since the snapshot is valid
  3. After the timer elapses (200ms), we end up in the case where we started

So then a question is: is 200ms too long?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm not sure if I follow.

We reset the timer, but then we continue which means we skip the code at the bottom of the loop and start waiting for a further change. Doesn't that mean we will just wait without sending the actual update until something else triggers a change? That could be an unbounded amount of time later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yep I mistakenly thought we would fall through the select statement in a later loop even if nothing happened.

}

// Allow the next change to trigger a send
coalesceTimer = nil

Expand All @@ -631,18 +649,25 @@ func (s *state) run() {
continue

case replyCh := <-s.reqCh:
logger.Trace("A proxy config snapshot was requested",
"proxy-id", s.proxyID.String(),
)

if !snap.Valid() {
// Not valid yet just respond with nil and move on to next task.
replyCh <- nil

logger.Trace("The proxy's config snapshot is not valid yet",
"proxy-id", s.proxyID.String(),
)
continue
}
// Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates.
snapCopy, err := snap.Clone()
if err != nil {
s.logger.Error("Failed to copy config snapshot for proxy",
"proxy", s.proxyID,
"error", err,
logger.Error("Failed to copy config snapshot for proxy",
"proxy-id", s.proxyID.String(), "error", err,
)
continue
}
Expand Down
12 changes: 12 additions & 0 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/hashicorp/consul/logging"
"sync/atomic"
"time"

Expand Down Expand Up @@ -164,6 +165,8 @@ const (
)

func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error {
logger := s.Logger.Named(logging.XDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment here about using With

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that would work here since the proxy ID isn't known until the xDS server state machine is running


// xDS requires a unique nonce to correlate response/request pairs
var nonce uint64

Expand Down Expand Up @@ -324,6 +327,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// state machine.
defer watchCancel()

logger.Trace("watching proxy, pending initial proxycfg snapshot",
"proxy-id", proxyID.String())

// Now wait for the config so we can check ACL
state = statePendingInitialConfig
case statePendingInitialConfig:
Expand All @@ -335,6 +341,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// Got config, try to authenticate next.
state = stateRunning

logger.Trace("Got initial config snapshot",
"proxy-id", cfgSnap.ProxyID.String())

// Lets actually process the config we just got or we'll mis responding
fallthrough
case stateRunning:
Expand All @@ -346,6 +355,9 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
// timer is first started.
extendAuthTimer()

logger.Trace("Invoking all xDS resource handlers and sending new data if there is any",
"proxy-id", cfgSnap.ProxyID.String())

// See if any handlers need to have the current (possibly new) config
// sent. Note the order here is actually significant so we can't just
// range the map which has no determined order. It's important because:
Expand Down
1 change: 1 addition & 0 deletions logging/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ const (
UIMetricsProxy string = "ui_metrics_proxy"
WAN string = "wan"
Watch string = "watch"
XDS string = "xds"
Vault string = "vault"
)