Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Server Implementation #422

Merged
merged 29 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8e9ef1f
get the base server implementation in
alecholmez Apr 6, 2021
607d6d7
get the base server implementation in
alecholmez Apr 6, 2021
e60f37c
doctor up some server code
alecholmez Apr 19, 2021
46df9d4
resolved some merge conflicts
alecholmez Apr 19, 2021
8acda3e
fixed double import
alecholmez Apr 19, 2021
983070f
refactor server logic to use a map of channels
alecholmez Apr 21, 2021
5154b8a
more corrections from PR review... tests are failing
alecholmez Apr 22, 2021
502507b
oops missed this code
alecholmez Apr 22, 2021
5ee43a8
Fixed build and modified tests
alecholmez May 3, 2021
3a8a63e
refactored request handling
alecholmez May 3, 2021
92cf396
refactor out declarative type handling
alecholmez May 3, 2021
55e6010
fixed the tests
alecholmez May 3, 2021
c5e6ff5
address some more PR comments and cleanup
alecholmez May 7, 2021
5d42783
added delta xds integration test
alecholmez May 11, 2021
3f75e52
Merge pull request #7 from alecholmez/incremental-testing
alecholmez May 12, 2021
29dc60f
ads integration test and fixed type url for wildcards
alecholmez May 17, 2021
286d696
refactor streamstate to not mutate in cache
alecholmez May 18, 2021
aca74f9
fixed data race
alecholmez May 18, 2021
fce6e70
more testing
alecholmez May 19, 2021
ad8acc8
add state tracker
alecholmez May 19, 2021
f140e2d
Merge pull request #8 from alecholmez/exp-state
alecholmez May 19, 2021
f8460e1
change nonce handling
alecholmez May 19, 2021
efc9bd3
remove nonce check
alecholmez May 19, 2021
50f8fc2
move to one map for type information in the server, use assert in tes…
alecholmez May 20, 2021
12d90bb
minor pr changes
alecholmez May 23, 2021
b24673b
fixed nil map assignment
alecholmez May 23, 2021
5a95db6
remove mute nil check
alecholmez May 24, 2021
bfc32da
refactored the mock CreateDeltaWatch to use the stream state instead …
alecholmez May 24, 2021
8e386e5
fixed watch reassign
alecholmez May 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@ PKG := github.com/envoyproxy/go-control-plane
build:
@go build ./pkg/... ./envoy/...

.PHONY: clean
clean:
@echo "--> cleaning compiled objects and binaries"
@go clean -tags netgo -i ./...
@go mod tidy
@rm -rf $(BINDIR)
@rm -rf *.log

# TODO(mattklein123): See the note in TestLinearConcurrentSetWatch() for why we set -parallel here
# This should be removed.
.PHONY: test
test:
@go test -race -v -timeout 30s -parallel 100 ./pkg/...
@go test -race -v -timeout 30s -count=1 -parallel 100 ./pkg/...

.PHONY: cover
cover:
Expand All @@ -33,15 +41,16 @@ examples:
#-----------------
#-- integration
#-----------------
.PHONY: $(BINDIR)/test $(BINDIR)/upstream integration integration.ads integration.xds integration.rest integration.xds.mux
.PHONY: $(BINDIR)/test $(BINDIR)/upstream integration integration.ads integration.xds integration.rest integration.xds.mux integration.xds.delta integration.ads.delta

$(BINDIR)/upstream:
@go build -race -o $@ internal/upstream/main.go

$(BINDIR)/test:
@echo "Building test binary"
@go build -race -o $@ pkg/test/main/main.go

integration: integration.xds integration.ads integration.rest integration.xds.mux
integration: integration.xds integration.ads integration.rest integration.xds.mux integration.xds.delta integration.ads.delta

integration.ads: $(BINDIR)/test $(BINDIR)/upstream
env XDS=ads build/integration.sh
Expand All @@ -55,6 +64,12 @@ integration.rest: $(BINDIR)/test $(BINDIR)/upstream
integration.xds.mux: $(BINDIR)/test $(BINDIR)/upstream
env XDS=xds build/integration.sh -mux

integration.xds.delta: $(BINDIR)/test $(BINDIR)/upstream
env XDS=delta build/integration.sh

integration.ads.delta: $(BINDIR)/test $(BINDIR)/upstream
env XDS=delta-ads build/integration.sh

#--------------------------------------
#-- example xDS control plane server
#--------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions build/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ set -o pipefail

MESSAGE=$'Hi, there!\n'

# Management server type. Valid values are "ads", "xds", "rest"
# Management server type. Valid values are "ads", "xds", "rest", "delta", or "delta-ads"
XDS=${XDS:-ads}

# Number of RTDS layers.
Expand Down Expand Up @@ -40,4 +40,4 @@ function cleanup() {
trap cleanup EXIT

# run the test suite (which also contains the control plane)
bin/test --xds=${XDS} --runtimes=${RUNTIMES} -debug -message="$MESSAGE" "$@"
bin/test --xds=${XDS} --runtimes=${RUNTIMES} -debug -message="$MESSAGE" "$@"
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/proto/otlp v0.7.0
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.36.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCb
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down Expand Up @@ -111,7 +112,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
2 changes: 1 addition & 1 deletion pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, *stream.StreamState) (value chan DeltaResponse, cancel func())
CreateDeltaWatch(*DeltaRequest, stream.StreamState) (value chan DeltaResponse, cancel func())
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
19 changes: 10 additions & 9 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
)

// Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change.
func respondDelta(request *DeltaRequest, value chan DeltaResponse, st *stream.StreamState, snapshot Snapshot, log log.Logger) *RawDeltaResponse {
resp, err := createDeltaResponse(request, st, snapshot)
func respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState, snapshot Snapshot, log log.Logger) *RawDeltaResponse {
resp, err := createDeltaResponse(request, state, snapshot, log)
if err != nil {
if log != nil {
log.Errorf("Error creating delta response: %v", err)
Expand All @@ -34,15 +34,15 @@ func respondDelta(request *DeltaRequest, value chan DeltaResponse, st *stream.St
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
if log != nil {
log.Debugf("node: %s, sending delta response with resources: %v removed resources %v wildcard: %t",
request.GetNode().GetId(), resp.Resources, resp.RemovedResources, st.IsWildcard)
request.GetNode().GetId(), resp.Resources, resp.RemovedResources, state.Wildcard)
}
value <- resp
return resp
}
return nil
}

func createDeltaResponse(req *DeltaRequest, st *stream.StreamState, snapshot Snapshot) (*RawDeltaResponse, error) {
func createDeltaResponse(req *DeltaRequest, state stream.StreamState, snapshot Snapshot, log log.Logger) (*RawDeltaResponse, error) {
resources := snapshot.GetResources((req.TypeUrl))

// variables to build our response with
Expand All @@ -51,20 +51,21 @@ func createDeltaResponse(req *DeltaRequest, st *stream.StreamState, snapshot Sna
toRemove := make([]string, 0)

// If we are handling a wildcard request, we want to respond with all resources
if st.IsWildcard {
switch {
case state.Wildcard:
for name, r := range resources {
// Since we've already precomputed the version hashes of the new snapshot,
// we can just set it here to be used for comparison later
version := snapshot.GetVersionMap()[req.TypeUrl][name]
nextVersionMap[name] = version
prevVersion, found := st.ResourceVersions[name]
prevVersion, found := state.ResourceVersions[name]
if !found || (prevVersion != nextVersionMap[name]) {
filtered = append(filtered, r)
}
}
} else {
default:
// Reply only with the requested resources
for name, prevVersion := range st.ResourceVersions {
for name, prevVersion := range state.ResourceVersions {
if r, ok := resources[name]; ok {
nextVersion := snapshot.GetVersionMap()[req.TypeUrl][name]
if prevVersion != nextVersion {
Expand All @@ -76,7 +77,7 @@ func createDeltaResponse(req *DeltaRequest, st *stream.StreamState, snapshot Sna
}

// Compute resources for removal regardless of the request type
for name := range st.ResourceVersions {
for name := range state.ResourceVersions {
if _, ok := resources[name]; !ok {
toRemove = append(toRemove, name)
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, &stream.StreamState{IsWildcard: true, ResourceVersions: nil})
}, stream.StreamState{ResourceVersions: nil, Wildcard: true})
}

if err := c.SetSnapshot(key, snapshot); err != nil {
t.Fatal(err)
}

vm := make(map[string]map[string]string)
versionMap := make(map[string]map[string]string)
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
select {
Expand All @@ -43,7 +43,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
t.Errorf("got resources %v, want %v", out.(*cache.RawDeltaResponse).Resources, snapshot.GetResources(typ))
}
vMap := out.GetNextVersionMap()
vm[typ] = vMap
versionMap[typ] = vMap
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
Expand All @@ -59,7 +59,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, &stream.StreamState{IsWildcard: false, ResourceVersions: vm[typ]})
}, stream.StreamState{ResourceVersions: versionMap[typ]})
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand All @@ -83,7 +83,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
t.Fatalf("got resources %v, want %v", out.(*cache.RawDeltaResponse).Resources, snapshot2.GetResources(rsrc.EndpointType))
}
vMap := out.GetNextVersionMap()
vm[testTypes[0]] = vMap
versionMap[testTypes[0]] = vMap
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
Expand All @@ -101,7 +101,7 @@ func TestDeltaRemoveResources(t *testing.T) {
Id: "node",
},
TypeUrl: typ,
}, &stream.StreamState{IsWildcard: true, ResourceVersions: nil})
}, stream.StreamState{ResourceVersions: make(map[string]string), Wildcard: true})
}

if err := c.SetSnapshot(key, snapshot); err != nil {
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestDeltaRemoveResources(t *testing.T) {
Id: "node",
},
TypeUrl: typ,
}, &stream.StreamState{IsWildcard: true, ResourceVersions: versionMap[typ]})
}, stream.StreamState{ResourceVersions: versionMap[typ], Wildcard: true})
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, &stream.StreamState{IsWildcard: true, ResourceVersions: nil})
}, stream.NewStreamState())
}
})
}(i)
Expand All @@ -203,7 +203,7 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, &stream.StreamState{IsWildcard: true, ResourceVersions: nil})
}, stream.NewStreamState())

// Cancel the watch
cancel()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func())
}
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, st *stream.StreamState) (chan DeltaResponse, func()) {
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (mux *MuxCache) CreateWatch(request *Request) (chan Response, func()) {
return cache.CreateWatch(request)
}

func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, st *stream.StreamState) (chan DeltaResponse, func()) {
func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
return nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func createResponse(request *Request, resources map[string]types.ResourceWithTtl
}

// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache.
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, st *stream.StreamState) (chan DeltaResponse, func()) {
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
nodeID := cache.hash.ID(request.Node)
t := request.GetTypeUrl()

Expand Down Expand Up @@ -424,17 +424,17 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, st *stream.S
cache.log.Errorf("failed to compute version for snapshot resources inline, waiting for next snapshot update")
delayedResponse = true
}
delayedResponse = respondDelta(request, value, st, snapshot, cache.log) == nil
delayedResponse = respondDelta(request, value, state, snapshot, cache.log) == nil
}

if delayedResponse {
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, system version %q", watchID,
t, st.ResourceVersions, nodeID, snapshot.GetVersion(t))
t, state.ResourceVersions, nodeID, snapshot.GetVersion(t))
}

info.SetDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: st})
info.SetDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})

return value, cache.cancelDeltaWatch(nodeID, watchID)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v3/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type DeltaResponseWatch struct {
Response chan DeltaResponse

// VersionMap for the stream
StreamState *stream.StreamState
StreamState stream.StreamState
}

// newStatusInfo initializes a status info data structure.
Expand Down Expand Up @@ -148,8 +148,8 @@ func (info *statusInfo) GetLastDeltaWatchRequestTime() time.Time {
return info.lastDeltaWatchRequestTime
}

// GetDeltaVersionMap will pull the version map out of a specific watch
func (info *statusInfo) GetDeltaStreamState(watchID int64) *stream.StreamState {
// GetDeltaStreamState will pull the stream state with the version map out of a specific watch
func (info *statusInfo) GetDeltaStreamState(watchID int64) stream.StreamState {
info.mu.RLock()
defer info.mu.RUnlock()
return info.deltaWatches[watchID].StreamState
Expand Down
Loading