Skip to content

Commit

Permalink
Include response chunking info in nonce (#3)
Browse files Browse the repository at this point in the history
If a given delta response contains too many resources, the server will
break it up into multiple responses. However, this means the client does
not know whether it received all the resources for its subscription.
This is especially relevant for wildcard subscriptions, for which the
client does not know the resources ahead of time and therefore cannot
wait for them explicitly. By returning additional metadata in the nonce
(there is no field for this in the delta discovery response, though I'm
hoping that will change cncf/xds#99), the
client can know if the server chunked the response, and react
accordingly.
  • Loading branch information
PapaCharlie authored Oct 2, 2024
1 parent d1254dc commit a4a1d81
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 42 deletions.
20 changes: 12 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ fmt: $(GOBIN)/goimports

# Can be used to change the number of tests run, defaults to 1 to prevent caching
TESTCOUNT = 1
# Can be used to change the verobosity of tests: make test TESTVERBOSE=-v
TESTVERBOSE =
# Can be used to generate coverage reports for a specific package
COVERPKG = $(PACKAGE)
# Can be used to add flags to the go test invocation: make test TESTFLAGS=-v
TESTFLAGS =
# Can be used to change which package gets tested, defaults to all packages.
TESTPKG = ./...

test: $(COVERAGE)
$(COVERAGE):
@mkdir -p $(@D)
go test -race -coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/... -count=$(TESTCOUNT) $(TESTVERBOSE) $(TESTPKG)
test:
go test -race -count=$(TESTCOUNT) $(TESTFLAGS) $(TESTPKG)

# Can be used to generate coverage reports for a specific package
COVERPKG = $(PACKAGE)

.PHONY: $(COVERAGE)

coverage: $(COVERAGE)
$(COVERAGE):
@mkdir -p $(@D)
$(MAKE) test TESTFLAGS='-coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/...'
go tool cover -html=$(COVERAGE)

profile_cache:
Expand Down
30 changes: 30 additions & 0 deletions ads/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ protocol (ADS), such as convenient type aliases, constants and core definitions.
package ads

import (
"encoding/binary"
"encoding/hex"
"errors"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -255,3 +258,30 @@ func LookupStreamTypeByRPCMethod(rpcMethod string) (StreamType, bool) {
return UnknownStreamType, false
}
}

var (
errInvalidNonceEncoding = errors.New("nonce isn't in hex encoding")
errInvalidNonceLength = errors.New("decoded nonce did not have expected length")
)

// ParseRemainingChunksFromNonce checks whether the Diderot server implementation chunked the delta
// responses because not all resources could fit in the same response without going over the default
// max gRPC message size of 4MB. A nonce from Diderot always starts with the 64-bit nanosecond
// timestamp of when the response was generated on the server. Then the number of remaining chunks as
// a 32-bit integer. The sequence of integers is binary encoded with [binary.BigEndian] then hex
// encoded. If the given nonce does not match the expected format, this function simply returns 0
// along with an error describing why it does not match. If the error isn't nil, it means the nonce
// was not created by a Diderot server implementation, and therefore does not contain the expected
// information.
func ParseRemainingChunksFromNonce(nonce string) (remainingChunks int, err error) {
decoded, err := hex.DecodeString(nonce)
if err != nil {
return 0, errInvalidNonceEncoding
}

if len(decoded) != 12 {
return 0, errInvalidNonceLength
}

return int(binary.BigEndian.Uint32(decoded[8:12])), nil
}
27 changes: 27 additions & 0 deletions ads/ads_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ads_test

import (
"log"

"github.com/linkedin/diderot/ads"
)

func ExampleParseRemainingChunksFromNonce() {
// Acquire a delta ADS client
var client ads.DeltaClient

var responses []*ads.DeltaDiscoveryResponse
for {
res, err := client.Recv()
if err != nil {
log.Panicf("Error receiving delta response: %v", err)
}
responses = append(responses, res)

if remaining, _ := ads.ParseRemainingChunksFromNonce(res.Nonce); remaining == 0 {
break
}
}

log.Printf("All responses received: %+v", responses)
}
4 changes: 2 additions & 2 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func newSotWHandler(

res := &ads.SotWDiscoveryResponse{
TypeUrl: typeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
for _, e := range entries {
res.Resources = append(res.Resources, e.Resource.Resource)
Expand All @@ -349,7 +349,7 @@ func newSotWHandler(

res := &ads.SotWDiscoveryResponse{
TypeUrl: typeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
for _, r := range allResources {
res.Resources = append(res.Resources, r.Resource.Resource)
Expand Down
6 changes: 5 additions & 1 deletion internal/server/handlers_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
"typeURL", ds.typeURL,
"updates", len(ds.queuedUpdates),
)
for i, c := range chunks {
c.Nonce = utils.NewNonce(len(chunks) - i - 1)
}
} else {
chunks[0].Nonce = utils.NewNonce(0)
}

return chunks
Expand All @@ -182,7 +187,6 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
func (ds *deltaSender) newChunk() *ads.DeltaDiscoveryResponse {
return &ads.DeltaDiscoveryResponse{
TypeUrl: ds.typeURL,
Nonce: utils.NewNonce(),
}
}

Expand Down
29 changes: 21 additions & 8 deletions internal/server/handlers_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"slices"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestInitialChunkSize(t *testing.T) {
typeURL := utils.GetTypeURL[*wrapperspb.StringValue]()
require.Equal(t, proto.Size(&ads.DeltaDiscoveryResponse{
TypeUrl: typeURL,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}), initialChunkSize(typeURL))
}

Expand All @@ -121,11 +122,23 @@ func TestDeltaHandlerChunking(t *testing.T) {
minChunkSize: initialChunkSize(typeURL),
}

sentResponses := ds.chunk(map[string]entry{
getSentResponses := func(resources map[string]entry, expectedChunks int) []*ads.DeltaDiscoveryResponse {
responses := ds.chunk(resources)
require.Len(t, responses, expectedChunks)
expectedRemainingChunks := 0
for _, res := range slices.Backward(responses) {
remaining, err := ads.ParseRemainingChunksFromNonce(res.Nonce)
require.NoError(t, err)
require.Equal(t, expectedRemainingChunks, remaining)
expectedRemainingChunks++
}
return responses
}

sentResponses := getSentResponses(map[string]entry{
foo.Name: {Resource: foo},
bar.Name: {Resource: bar},
})

}, 2)
require.Equal(t, len(sentResponses[0].Resources), 1)
require.Equal(t, len(sentResponses[1].Resources), 1)
response0 := sentResponses[0].Resources[0]
Expand All @@ -142,10 +155,10 @@ func TestDeltaHandlerChunking(t *testing.T) {
// Delete resources whose names are the same size as the resources to trip the chunker with the same conditions
name1 := strings.Repeat("1", resourceSize)
name2 := strings.Repeat("2", resourceSize)
sentResponses = ds.chunk(map[string]entry{
sentResponses = getSentResponses(map[string]entry{
name1: {Resource: nil},
name2: {Resource: nil},
})
}, 2)
require.Equal(t, len(sentResponses[0].RemovedResources), 1)
require.Equal(t, len(sentResponses[1].RemovedResources), 1)
require.ElementsMatch(t,
Expand All @@ -156,12 +169,12 @@ func TestDeltaHandlerChunking(t *testing.T) {
small1, small2, small3 := "a", "b", "c"
wayTooBig := strings.Repeat("3", 10*resourceSize)

sentResponses = ds.chunk(map[string]entry{
sentResponses = getSentResponses(map[string]entry{
small1: {Resource: nil},
small2: {Resource: nil},
small3: {Resource: nil},
wayTooBig: {Resource: nil},
})
}, 1)
require.Equal(t, len(sentResponses[0].RemovedResources), 3)
require.ElementsMatch(t, []string{small1, small2, small3}, sentResponses[0].RemovedResources)
require.Equal(t, int64(1), statsHandler.DeltaResourcesOverMaxSize.Load())
Expand Down
48 changes: 29 additions & 19 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package utils

import (
"encoding/base64"
"encoding/binary"
"encoding/hex"
"slices"
"strconv"
"strings"
"time"

Expand All @@ -12,24 +13,33 @@ import (
"google.golang.org/protobuf/proto"
)

// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX
// time in nanos in hex encoding, so the nonce will be 16 characters if the current UNIX nano time is
// greater than 2^60-1. This is because it takes 16 hex characters to encode 64 bits, but only 15 to
// encode 60 bits (the output of strconv.FormatInt is not padded by 0s). 2^60-1 nanos from epoch time
// (January 1st 1970) is 2006-07-14 23:58:24.606, which as of this writing is over 17 years ago. This
// is why it's guaranteed that NonceLength will be 16 characters (before that date, encoding the
// nanos only required 15 characters). For the curious, the UNIX nano timestamp will overflow int64
// some time in 2262, making this constant valid for the next few centuries.
const NonceLength = 16

// NewNonce creates a new unique nonce based on the current UNIX time in nanos. It always returns a
// string of length NonceLength.
func NewNonce() string {
// The second parameter to FormatInt is the base, e.g. 2 will return binary, 8 will return octal
// encoding, etc. 16 means FormatInt returns the integer in hex encoding, e.g. 30 => "1e" or
// 1704239351400 => "18ccc94c668".
const hexBase = 16
return strconv.FormatInt(time.Now().UnixNano(), hexBase)
const (
// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX
// time in nanos and the remaining chunks, encoded as 64-bit and 32-bit integers respectively, then
// hex encoded. This means a nonce will always be 8 + 4 bytes, multiplied by 2 by the hex encoding.
NonceLength = (8 + 4) * 2
)

// NewNonce creates a new unique nonce based on the current UNIX time in nanos, always returning a
// string of [NonceLength].
func NewNonce(remainingChunks int) string {
return newNonce(time.Now(), remainingChunks)
}

func newNonce(now time.Time, remainingChunks int) string {
// preallocating these buffers with constants (instead of doing `out = make([]byte, len(buf) * 2)`)
// means the compiler will allocate them on the stack, instead of heap. This significantly reduces
// the amount of garbage created by this function, as the only heap allocation will be the final
// string(out), rather than all of these buffers.
buf := make([]byte, NonceLength/2)
out := make([]byte, NonceLength)

binary.BigEndian.PutUint64(buf[:8], uint64(now.UnixNano()))
binary.BigEndian.PutUint32(buf[8:], uint32(remainingChunks))

hex.Encode(out, buf)

return string(out)
}

func GetTypeURL[T proto.Message]() string {
Expand Down
25 changes: 23 additions & 2 deletions internal/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package utils

import (
"fmt"
"testing"
"time"

"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/linkedin/diderot/ads"
Expand Down Expand Up @@ -46,6 +48,25 @@ func TestProtoMap(t *testing.T) {
})
}

func TestNonceLength(t *testing.T) {
require.Len(t, NewNonce(), NonceLength)
func TestNewNonce(t *testing.T) {
now := time.Now()
t.Run("remainingChunks", func(t *testing.T) {
for _, expected := range []int{0, 42} {
nonce := newNonce(now, expected)
require.Equal(t, fmt.Sprintf("%x%08x", now.UnixNano(), expected), nonce)
actualRemainingChunks, err := ads.ParseRemainingChunksFromNonce(nonce)
require.NoError(t, err)
require.Equal(t, expected, actualRemainingChunks)
}
})
t.Run("badNonce", func(t *testing.T) {
remaining, err := ads.ParseRemainingChunksFromNonce("foo")
require.Error(t, err)
require.Zero(t, remaining)
})
t.Run("oldNonce", func(t *testing.T) {
remaining, err := ads.ParseRemainingChunksFromNonce(fmt.Sprintf("%x", now.UnixNano()))
require.Error(t, err)
require.Zero(t, remaining)
})
}
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *ADSServer) StreamAggregatedResources(stream ads.SotWStream) (err error)
return &ads.SotWDiscoveryResponse{
Resources: nil,
TypeUrl: req.TypeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
},
setControlPlane: func(res *ads.SotWDiscoveryResponse, controlPlane *corev3.ControlPlane) {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s *ADSServer) DeltaAggregatedResources(stream ads.DeltaStream) (err error)
return &ads.DeltaDiscoveryResponse{
TypeUrl: req.GetTypeUrl(),
RemovedResources: req.GetResourceNamesSubscribe(),
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
ControlPlane: s.controlPlane,
}
},
Expand Down

0 comments on commit a4a1d81

Please sign in to comment.