Skip to content

Commit

Permalink
xds: update nonce even if the ACK/NACK is not sent on wire (#3497)
Browse files Browse the repository at this point in the history
This can happen when the watch is canceled while the response is on wire.

Also, tag ACK/NACK with the stream so nonce for a new stream doesn't get updated by a ACK from the previous stream.
  • Loading branch information
menghanl authored Apr 3, 2020
1 parent 66e9dfe commit a9601d9
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 9 deletions.
6 changes: 5 additions & 1 deletion xds/internal/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ func (wi *watchInfo) stopTimer() {

type ackInfo struct {
typeURL string
version string // Nack if version is an empty string.
version string // NACK if version is an empty string.
nonce string
// ACK/NACK are tagged with the stream it's for. When the stream is down,
// all the ACK/NACK for this stream will be dropped, and the version/nonce
// won't be updated.
stream adsStream
}

type ldsUpdate struct {
Expand Down
29 changes: 21 additions & 8 deletions xds/internal/client/v2client.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,25 +255,36 @@ func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, v
// processAckInfo pulls the fields needed by the ack request from a ackInfo.
//
// If no active watch is found for this ack, it returns false for send.
func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) {
func (v2c *v2Client) processAckInfo(t *ackInfo, stream adsStream) (target []string, typeURL, version, nonce string, send bool) {
if t.stream != stream {
// If ACK's stream isn't the current sending stream, this means the ACK
// was pushed to queue before the old stream broke, and a new stream has
// been started since. Return immediately here so we don't update the
// nonce for the new stream.
return
}
typeURL = t.typeURL

v2c.mu.Lock()
defer v2c.mu.Unlock()

// Update the nonce no matter if we are going to send the ACK request on
// wire. We may not send the request if the watch is canceled. But the nonce
// needs to be updated so the next request will have the right nonce.
nonce = t.nonce
v2c.nonceMap[typeURL] = nonce

wi, ok := v2c.watchMap[typeURL]
if !ok {
// We don't send the request ack if there's no active watch (this can be
// either the server sends responses before any request, or the watch is
// canceled while the ackInfo is in queue), because there's no resource
// name. And if we send a request with empty resource name list, the
// server may treat it as a wild card and send us everything.
return // This returns all zero values, and false for send.
return nil, "", "", "", false
}
send = true

version = t.version
nonce = t.nonce
target = wi.target
if version == "" {
// This is a nack, get the previous acked version.
version = v2c.versionMap[typeURL]
Expand All @@ -283,8 +294,8 @@ func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, versi
} else {
v2c.versionMap[typeURL] = version
}
v2c.nonceMap[typeURL] = nonce
return
target = wi.target
return target, typeURL, version, nonce, send
}

// send is a separate goroutine for sending watch requests on the xds stream.
Expand Down Expand Up @@ -327,7 +338,7 @@ func (v2c *v2Client) send() {
case *watchInfo:
target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
case *ackInfo:
target, typeURL, version, nonce, send = v2c.processAckInfo(t)
target, typeURL, version, nonce, send = v2c.processAckInfo(t, stream)
}
if !send {
continue
Expand Down Expand Up @@ -381,6 +392,7 @@ func (v2c *v2Client) recv(stream adsStream) bool {
typeURL: typeURL,
version: "",
nonce: resp.GetNonce(),
stream: stream,
})
v2c.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce(), respHandleErr)
continue
Expand All @@ -389,6 +401,7 @@ func (v2c *v2Client) recv(stream adsStream) bool {
typeURL: typeURL,
version: resp.GetVersionInfo(),
nonce: resp.GetNonce(),
stream: stream,
})
v2c.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce())
success = true
Expand Down
111 changes: 111 additions & 0 deletions xds/internal/client/v2client_ack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Ch
//
// It also waits and checks that the ack request contains the given version, and
// the generated nonce.
//
// TODO: make this and other helper function either consistently return error,
// and fatal() in the test code, or all call t.Fatal(), and mark them as
// helper().
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (nonce string) {
nonce = sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version)
t.Logf("Good %s response pushed to fakeServer...", xdsname)
Expand Down Expand Up @@ -263,3 +267,110 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}

// TestV2ClientAckNewWatchAfterCancel verifies the new request for a new watch
// after the previous watch is canceled, has the right version.
func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {
var versionCDS = 3000

fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
defer v2c.close()
t.Log("Started xds v2Client...")

// Start a CDS watch.
callbackCh := testutils.NewChannel()
cancel := v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
callbackCh.Send(struct{}{})
})
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
t.Fatal(err)
}
t.Logf("FakeServer received %s request...", "CDS")

// Send a good CDS response, this function waits for the ACK with the right
// version.
nonce := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)

// Cancel the CDS watch, and start a new one. The new watch should have the
// version from the response above.
cancel()
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
callbackCh.Send(struct{}{})
})
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
}
versionCDS++

// Send a bad response with the next version.
sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest)
versionCDS++

// send another good response, and check for ack, with the new version.
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)
versionCDS++
}

// TestV2ClientAckCancelResponseRace verifies if the response and ACK request
// race with cancel (which means the ACK request will not be sent on wire,
// because there's no active watch), the nonce will still be updated, and the
// new request with the new watch will have the correct nonce.
func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
var versionCDS = 3000

fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
defer v2c.close()
t.Log("Started xds v2Client...")

// Start a CDS watch.
callbackCh := testutils.NewChannel()
cancel := v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
callbackCh.Send(struct{}{})
})
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
}
t.Logf("FakeServer received %s request...", "CDS")

// send another good response, and check for ack, with the new version.
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)
versionCDS++

// Cancel the watch before the next response is sent. This mimics the case
// watch is canceled while response is on wire.
cancel()

// Send a good response.
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodCDSResponse1, versionCDS)
t.Logf("Good %s response pushed to fakeServer...", "CDS")

// Expect no ACK because watch was canceled.
if req, err := fakeServer.XDSRequestChan.Receive(); err != testutils.ErrRecvTimeout {
t.Fatalf("Got unexpected xds request after watch is canceled: %v", req)
}

// Start a new watch. The new watch should have the nonce from the response
// above, and version from the first good response.
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
callbackCh.Send(struct{}{})
})
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
}

// Send a bad response with the next version.
sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest)
versionCDS++

// send another good response, and check for ack, with the new version.
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)
versionCDS++
}

0 comments on commit a9601d9

Please sign in to comment.