Skip to content

Commit

Permalink
Prevent xDS tight loop on cfg errors (#12195)
Browse files Browse the repository at this point in the history
  • Loading branch information
freddygv committed Feb 10, 2022
1 parent 96e7671 commit 8ef36fd
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 29 deletions.
3 changes: 3 additions & 0 deletions .changelog/12195.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: prevents tight loop where the Consul client agent would repeatedly re-send config that Envoy has rejected.
```
30 changes: 26 additions & 4 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
"github.com/hashicorp/consul/logging"
)

type deltaRecvResponse int

const (
deltaRecvResponseNack deltaRecvResponse = iota
deltaRecvResponseAck
deltaRecvNewSubscription
deltaRecvUnknownType
)

// ADSDeltaStream is a shorter way of referring to this thing...
type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer

Expand Down Expand Up @@ -175,8 +184,17 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
}

if handler, ok := handlers[req.TypeUrl]; ok {
if handler.Recv(req, generator.ProxyFeatures) {
switch handler.Recv(req, generator.ProxyFeatures) {
case deltaRecvNewSubscription:
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)

case deltaRecvResponseNack:
generator.Logger.Trace("got nack response for type", "typeUrl", req.TypeUrl)

// There is no reason to believe that generating new xDS resources from the same snapshot
// would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait
// for a new request or snapshot.
continue
}
}

Expand Down Expand Up @@ -430,9 +448,9 @@ func newDeltaType(
// Recv handles new discovery requests from envoy.
//
// Returns true the first time a type receives a request.
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool {
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) deltaRecvResponse {
if t == nil {
return false // not something we care about
return deltaRecvUnknownType // not something we care about
}
logger := t.generator.Logger.With("typeUrl", t.typeURL)

Expand Down Expand Up @@ -487,6 +505,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce,
"error", status.ErrorProto(req.ErrorDetail))
t.nack(req.ResponseNonce)
return deltaRecvResponseNack
}
}

Expand Down Expand Up @@ -557,7 +576,10 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
}
}

return registeredThisTime
if registeredThisTime {
return deltaRecvNewSubscription
}
return deltaRecvResponseAck
}

func (t *xDSDeltaType) ack(nonce string) {
Expand Down
146 changes: 121 additions & 25 deletions agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,58 +203,154 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})

runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) {
// Trigger only an EDS update.
// NOTE: this has to be the last subtest since it kills the stream
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))

// Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment)
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, "db", "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)

// Send envoy an EDS update.
// We never send any replies about this change because we died.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})

envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

sid := structs.NewServiceID("web-sidecar-proxy", nil)

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)

var snap *proxycfg.ConfigSnapshot

runStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")

// Plug in a bad port for the public listener
snap.Port = 1

// Send initial cluster discover.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
})

runStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(6),
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})

envoy.SendDeltaReqNACK(t, EndpointType, 6, &rpcstatus.Status{})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})

// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)

// Send it again.
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(7),
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
makeTestEndpoints(t, snap, "tcp:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})

envoy.SendDeltaReqACK(t, EndpointType, 7)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)

// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)

// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
// Response contains public_listener with port that Envoy can't bind to
makeTestListener(t, snap, "tcp:bad_public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})

// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})

// NOTE: this has to be the last subtest since it kills the stream
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))
// NACKs the listener update due to the bad public listener
envoy.SendDeltaReqNACK(t, ListenerType, 3, &rpcstatus.Status{})

// Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment)
snap = newTestSnapshot(t, snap, "")
// Consul should not respond until a new snapshot is delivered
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})

runStep(t, "simulate envoy NACKing a listener update", func(t *testing.T) {
// Correct the port and deliver a new snapshot
snap.Port = 9999
mgr.DeliverConfig(t, sid, snap)

// We never send any replies about this change because we died.
// And should send a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(4),
Resources: makeTestResources(t,
// Send a public listener that Envoy will accept
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})

// New listener is acked now
envoy.SendDeltaReqACK(t, EndpointType, 4)

assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})

envoy.Close()
select {
case err := <-errCh:
// Envoy died.
expect := status.Errorf(codes.Unavailable,
"failed to send upsert reply for type %q: test error",
EndpointType)
require.EqualError(t, err, expect.Error())
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
Expand Down
24 changes: 24 additions & 0 deletions agent/xds/xds_protocol_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,30 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str

func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_listener_v3.Listener {
switch fixtureName {
case "tcp:bad_public_listener":
return &envoy_listener_v3.Listener{
// Envoy can't bind to port 1
Name: "public_listener:0.0.0.0:1",
Address: makeAddress("0.0.0.0", 1),
TrafficDirection: envoy_core_v3.TrafficDirection_INBOUND,
FilterChains: []*envoy_listener_v3.FilterChain{
{
TransportSocket: xdsNewPublicTransportSocket(t, snap),
Filters: []*envoy_listener_v3.Filter{
xdsNewFilter(t, "envoy.filters.network.rbac", &envoy_network_rbac_v3.RBAC{
Rules: &envoy_rbac_v3.RBAC{},
StatPrefix: "connect_authz",
}),
xdsNewFilter(t, "envoy.filters.network.tcp_proxy", &envoy_tcp_proxy_v3.TcpProxy{
ClusterSpecifier: &envoy_tcp_proxy_v3.TcpProxy_Cluster{
Cluster: "local_app",
},
StatPrefix: "public_listener",
}),
},
},
},
}
case "tcp:public_listener":
return &envoy_listener_v3.Listener{
Name: "public_listener:0.0.0.0:9999",
Expand Down

0 comments on commit 8ef36fd

Please sign in to comment.