diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index 001552d7b479..64e23ecd9648 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -363,29 +363,21 @@ func (t *Transport) send(ctx context.Context) { // The xDS protocol only requires that we send the node proto in the first // discovery request on every stream. Sending the node proto in every // request message wastes CPU resources on the client and the server. - sendNodeProto := true + sentNodeProto := false for { select { case <-ctx.Done(): return case stream = <-t.adsStreamCh: // We have a new stream and we've to ensure that the node proto gets - // sent out in the first request on the stream. At this point, we - // might not have any registered watches. Setting this field to true - // here will ensure that the node proto gets sent out along with the - // discovery request when the first watch is registered. - if len(t.resources) == 0 { - sendNodeProto = true - continue - } - - if !t.sendExisting(stream) { + // sent out in the first request on the stream. + var err error + if sentNodeProto, err = t.sendExisting(stream); err != nil { // Send failed, clear the current stream. Attempt to resend will // only be made after a new stream is created. stream = nil continue } - sendNodeProto = false case u, ok := <-t.adsRequestCh.Get(): if !ok { // No requests will be sent after the adsRequestCh buffer is closed. @@ -416,12 +408,12 @@ func (t *Transport) send(ctx context.Context) { // sending response back). continue } - if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, resources, url, version, nonce, nackErr); err != nil { + if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, resources, url, version, nonce, nackErr); err != nil { t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, version, nonce, err) // Send failed, clear the current stream. stream = nil } - sendNodeProto = false + sentNodeProto = true } } } @@ -433,7 +425,9 @@ func (t *Transport) send(ctx context.Context) { // that here because the stream has just started and Send() usually returns // quickly (once it pushes the message onto the transport layer) and is only // ever blocked if we don't have enough flow control quota. -func (t *Transport) sendExisting(stream adsStream) bool { +// +// Returns true if the node proto was sent. +func (t *Transport) sendExisting(stream adsStream) (sentNodeProto bool, err error) { t.mu.Lock() defer t.mu.Unlock() @@ -450,16 +444,18 @@ func (t *Transport) sendExisting(stream adsStream) bool { t.nonces = make(map[string]string) // Send node proto only in the first request on the stream. - sendNodeProto := true for url, resources := range t.resources { - if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil { + if len(resources) == 0 { + continue + } + if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil { t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, t.versions[url], "", err) - return false + return false, err } - sendNodeProto = false + sentNodeProto = true } - return true + return sentNodeProto, nil } // recv receives xDS responses on the provided ADS stream and branches out to diff --git a/xds/internal/xdsclient/transport/transport_resource_test.go b/xds/internal/xdsclient/transport/transport_resource_test.go index 43ec82ae74ed..35b5219ae6ca 100644 --- a/xds/internal/xdsclient/transport/transport_resource_test.go +++ b/xds/internal/xdsclient/transport/transport_resource_test.go @@ -21,6 +21,7 @@ package transport_test import ( "context" + "errors" "testing" "time" @@ -217,3 +218,193 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) { }) } } + +func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + mgmtServer, cleanup := startFakeManagementServer(t) + defer cleanup() + t.Logf("Started xDS management server on %s", mgmtServer.Address) + nodeProto := &v3corepb.Node{Id: uuid.New().String()} + tr, err := transport.New(transport.Options{ + ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), + OnRecvHandler: func(update transport.ResourceUpdate) error { + return nil + }, + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. + OnErrorHandler: func(error) {}, // No stream error handling. + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + NodeProto: nodeProto, + }) + if err != nil { + t.Fatalf("Failed to create xDS transport: %v", err) + } + defer tr.Close() + + // Send a request for a listener resource. + const resource = "some-resource" + tr.SendRequest(version.V3ListenerURL, []string{resource}) + + // Ensure the proper request was sent. + val, err := mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq := val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Remove the subscription by requesting an empty list. + tr.SendRequest(version.V3ListenerURL, []string{}) + + // Ensure the proper request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + ResourceNames: []string{}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Cause the stream to restart. + mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")} + + // Ensure no request is sent since there are no resources. + ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer cancel() + if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got) + } + + tr.SendRequest(version.V3ListenerURL, []string{resource}) + + // Ensure the proper request was sent with the node proto. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + +} + +func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + mgmtServer, cleanup := startFakeManagementServer(t) + defer cleanup() + t.Logf("Started xDS management server on %s", mgmtServer.Address) + nodeProto := &v3corepb.Node{Id: uuid.New().String()} + tr, err := transport.New(transport.Options{ + ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), + OnRecvHandler: func(update transport.ResourceUpdate) error { + return nil + }, + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. + OnErrorHandler: func(error) {}, // No stream error handling. + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + NodeProto: nodeProto, + }) + if err != nil { + t.Fatalf("Failed to create xDS transport: %v", err) + } + defer tr.Close() + + // Send a request for a listener resource. + const resource = "some-resource" + tr.SendRequest(version.V3ListenerURL, []string{resource}) + + // Ensure the proper request was sent. + val, err := mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq := val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Send a request for a cluster resource. + tr.SendRequest(version.V3ClusterURL, []string{resource}) + + // Ensure the proper request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Remove the cluster subscription by requesting an empty list. + tr.SendRequest(version.V3ClusterURL, []string{}) + + // Ensure the proper request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + ResourceNames: []string{}, + TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Cause the stream to restart. + mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")} + + // Ensure the proper LDS request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Ensure no cluster request is sent since there are no cluster resources. + ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer cancel() + if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got) + } +}