diff --git a/Makefile b/Makefile index 6f86e3d660..e07adf883e 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,10 @@ build: test: @go test ./pkg/... +.PHONY: benchmark +benchmark: + @go test ./pkg/... -bench=. + .PHONY: cover cover: @build/coverage.sh diff --git a/pkg/cache/v2/cache_test.go b/pkg/cache/v2/cache_test.go index 75194f6235..ef6c059a3c 100644 --- a/pkg/cache/v2/cache_test.go +++ b/pkg/cache/v2/cache_test.go @@ -65,3 +65,54 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) { assert.Equal(t, r.Name, resourceName) assert.Equal(t, discoveryResponse, dr) } + +// BENCHMARKS ===================================================================================================== + +func BenchmarkResponseGetDiscoveryResponse(b *testing.B) { + routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}} + resp := cache.RawResponse{ + Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, + Version: "v", + Resources: routes, + } + + discoveryResponse, err := resp.GetDiscoveryResponse() + assert.Nil(b, err) + assert.Equal(b, discoveryResponse.VersionInfo, resp.Version) + assert.Equal(b, len(discoveryResponse.Resources), 1) + + cachedResponse, err := resp.GetDiscoveryResponse() + assert.Nil(b, err) + assert.Same(b, discoveryResponse, cachedResponse) + + r := &route.RouteConfiguration{} + err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r) + assert.Nil(b, err) + assert.Equal(b, r.Name, resourceName) +} + +func BenchmarkPassthroughResponseGetDiscoveryResponse(b *testing.B) { + routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}} + rsrc, err := ptypes.MarshalAny(routes[0]) + assert.Nil(b, err) + dr := &discovery.DiscoveryResponse{ + TypeUrl: resource.RouteType, + Resources: []*any.Any{rsrc}, + VersionInfo: "v", + } + resp := cache.PassthroughResponse{ + Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, + DiscoveryResponse: dr, + } + + discoveryResponse, err := resp.GetDiscoveryResponse() + assert.Nil(b, err) + assert.Equal(b, discoveryResponse.VersionInfo, resp.DiscoveryResponse.VersionInfo) + assert.Equal(b, len(discoveryResponse.Resources), 1) + + r := &route.RouteConfiguration{} + err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r) + assert.Nil(b, err) + assert.Equal(b, r.Name, resourceName) + assert.Equal(b, discoveryResponse, dr) +} diff --git a/pkg/cache/v2/simple_test.go b/pkg/cache/v2/simple_test.go index 825cfb6df7..7d58eadad5 100644 --- a/pkg/cache/v2/simple_test.go +++ b/pkg/cache/v2/simple_test.go @@ -73,6 +73,7 @@ var ( type logger struct { t *testing.T + b *testing.B } func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } @@ -275,3 +276,118 @@ func TestSnapshotClear(t *testing.T) { t.Errorf("keys should be empty") } } + +// BENCHMARKS ===================================================================================================== + +func BenchmarkSnapshotCache(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + + if _, err := c.GetSnapshot(key); err == nil { + b.Errorf("unexpected snapshot found for key %q", key) + } + + if err := c.SetSnapshot(key, snapshot); err != nil { + b.Fatal(err) + } + + snap, err := c.GetSnapshot(key) + if err != nil { + b.Fatal(err) + } + if !reflect.DeepEqual(snap, snapshot) { + b.Errorf("expect snapshot: %v, got: %v", snapshot, snap) + } + + // try to get endpoints with incorrect list of names + // should not receive response + value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}) + select { + case out := <-value: + b.Errorf("watch for endpoints and mismatched names => got %v, want none", out) + case <-time.After(time.Second / 4): + } + + for _, typ := range testTypes { + b.Run(typ, func(b *testing.B) { + value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + select { + case out := <-value: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + b.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResources(typ)) { + b.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResources(typ)) + } + case <-time.After(time.Second): + b.Fatal("failed to receive snapshot response") + } + }) + } +} + +func BenchmarkSnapshotCacheFetch(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + if err := c.SetSnapshot(key, snapshot); err != nil { + b.Fatal(err) + } + + for _, typ := range testTypes { + b.Run(typ, func(b *testing.B) { + resp, err := c.Fetch(context.Background(), &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + if err != nil || resp == nil { + b.Fatal("unexpected error or null response") + } + if gotVersion, _ := resp.GetVersion(); gotVersion != version { + b.Errorf("got version %q, want %q", gotVersion, version) + } + }) + } + + // no response for missing snapshot + if resp, err := c.Fetch(context.Background(), + &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, Node: &core.Node{Id: "oof"}}); resp != nil || err == nil { + b.Errorf("missing snapshot: response is not nil %v", resp) + } + + // no response for latest version + if resp, err := c.Fetch(context.Background(), + &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, VersionInfo: version}); resp != nil || err == nil { + b.Errorf("latest version: response is not nil %v", resp) + } +} + +func BenchmarkSnapshotClear(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + if err := c.SetSnapshot(key, snapshot); err != nil { + b.Fatal(err) + } + c.ClearSnapshot(key) + if empty := c.GetStatusInfo(key); empty != nil { + b.Errorf("cache should be cleared") + } + if keys := c.GetStatusKeys(); len(keys) != 0 { + b.Errorf("keys should be empty") + } +} + +func BenchmarkSnapshotCacheWatchCancel(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + for _, typ := range testTypes { + _, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + cancel() + } + // should be status info for the node + if keys := c.GetStatusKeys(); len(keys) == 0 { + b.Error("got 0, want status info for the node") + } + + for _, typ := range testTypes { + if count := c.GetStatusInfo(key).GetNumWatches(); count > 0 { + b.Errorf("watches should be released for %s", typ) + } + } + + if empty := c.GetStatusInfo("missing"); empty != nil { + b.Errorf("should not return a status for unknown key: got %#v", empty) + } +} diff --git a/pkg/cache/v3/cache_test.go b/pkg/cache/v3/cache_test.go index 217edaf0d6..f8216601c7 100644 --- a/pkg/cache/v3/cache_test.go +++ b/pkg/cache/v3/cache_test.go @@ -66,3 +66,54 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) { assert.Equal(t, r.Name, resourceName) assert.Equal(t, discoveryResponse, dr) } + +// BENCHMARKS ===================================================================================================== + +func BenchmarkResponseGetDiscoveryResponse(b *testing.B) { + routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}} + resp := cache.RawResponse{ + Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, + Version: "v", + Resources: routes, + } + + discoveryResponse, err := resp.GetDiscoveryResponse() + assert.Nil(b, err) + assert.Equal(b, discoveryResponse.VersionInfo, resp.Version) + assert.Equal(b, len(discoveryResponse.Resources), 1) + + cachedResponse, err := resp.GetDiscoveryResponse() + assert.Nil(b, err) + assert.Same(b, discoveryResponse, cachedResponse) + + r := &route.RouteConfiguration{} + err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r) + assert.Nil(b, err) + assert.Equal(b, r.Name, resourceName) +} + +func BenchmarkPassthroughResponseGetDiscoveryResponse(b *testing.B) { + routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}} + rsrc, err := ptypes.MarshalAny(routes[0]) + assert.Nil(b, err) + dr := &discovery.DiscoveryResponse{ + TypeUrl: resource.RouteType, + Resources: []*any.Any{rsrc}, + VersionInfo: "v", + } + resp := cache.PassthroughResponse{ + Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, + DiscoveryResponse: dr, + } + + discoveryResponse, err := resp.GetDiscoveryResponse() + assert.Nil(b, err) + assert.Equal(b, discoveryResponse.VersionInfo, resp.DiscoveryResponse.VersionInfo) + assert.Equal(b, len(discoveryResponse.Resources), 1) + + r := &route.RouteConfiguration{} + err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r) + assert.Nil(b, err) + assert.Equal(b, r.Name, resourceName) + assert.Equal(b, discoveryResponse, dr) +} diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 7e2399470b..0c7f0005e8 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -74,6 +74,7 @@ var ( type logger struct { t *testing.T + b *testing.B } func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } @@ -276,3 +277,118 @@ func TestSnapshotClear(t *testing.T) { t.Errorf("keys should be empty") } } + +// BENCHMARKS ===================================================================================================== + +func BenchmarkSnapshotCache(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + + if _, err := c.GetSnapshot(key); err == nil { + b.Errorf("unexpected snapshot found for key %q", key) + } + + if err := c.SetSnapshot(key, snapshot); err != nil { + b.Fatal(err) + } + + snap, err := c.GetSnapshot(key) + if err != nil { + b.Fatal(err) + } + if !reflect.DeepEqual(snap, snapshot) { + b.Errorf("expect snapshot: %v, got: %v", snapshot, snap) + } + + // try to get endpoints with incorrect list of names + // should not receive response + value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}) + select { + case out := <-value: + b.Errorf("watch for endpoints and mismatched names => got %v, want none", out) + case <-time.After(time.Second / 4): + } + + for _, typ := range testTypes { + b.Run(typ, func(b *testing.B) { + value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + select { + case out := <-value: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + b.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResources(typ)) { + b.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResources(typ)) + } + case <-time.After(time.Second): + b.Fatal("failed to receive snapshot response") + } + }) + } +} + +func BenchmarkSnapshotCacheFetch(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + if err := c.SetSnapshot(key, snapshot); err != nil { + b.Fatal(err) + } + + for _, typ := range testTypes { + b.Run(typ, func(b *testing.B) { + resp, err := c.Fetch(context.Background(), &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + if err != nil || resp == nil { + b.Fatal("unexpected error or null response") + } + if gotVersion, _ := resp.GetVersion(); gotVersion != version { + b.Errorf("got version %q, want %q", gotVersion, version) + } + }) + } + + // no response for missing snapshot + if resp, err := c.Fetch(context.Background(), + &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, Node: &core.Node{Id: "oof"}}); resp != nil || err == nil { + b.Errorf("missing snapshot: response is not nil %v", resp) + } + + // no response for latest version + if resp, err := c.Fetch(context.Background(), + &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, VersionInfo: version}); resp != nil || err == nil { + b.Errorf("latest version: response is not nil %v", resp) + } +} + +func BenchmarkSnapshotClear(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + if err := c.SetSnapshot(key, snapshot); err != nil { + b.Fatal(err) + } + c.ClearSnapshot(key) + if empty := c.GetStatusInfo(key); empty != nil { + b.Errorf("cache should be cleared") + } + if keys := c.GetStatusKeys(); len(keys) != 0 { + b.Errorf("keys should be empty") + } +} + +func BenchmarkSnapshotCacheWatchCancel(b *testing.B) { + c := cache.NewSnapshotCache(true, group{}, nil) + for _, typ := range testTypes { + _, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + cancel() + } + // should be status info for the node + if keys := c.GetStatusKeys(); len(keys) == 0 { + b.Error("got 0, want status info for the node") + } + + for _, typ := range testTypes { + if count := c.GetStatusInfo(key).GetNumWatches(); count > 0 { + b.Errorf("watches should be released for %s", typ) + } + } + + if empty := c.GetStatusInfo("missing"); empty != nil { + b.Errorf("should not return a status for unknown key: got %#v", empty) + } +} diff --git a/pkg/conversion/struct_test.go b/pkg/conversion/struct_test.go index 9935602be8..3e0f1f5636 100644 --- a/pkg/conversion/struct_test.go +++ b/pkg/conversion/struct_test.go @@ -64,3 +64,44 @@ func TestConversion(t *testing.T) { t.Error("StructToMessage(nil) => got no error") } } + +// BENCHMARKS ===================================================================================================== + +func BenchmarkConversion(b *testing.B) { + pb := &v2.DiscoveryRequest{ + VersionInfo: "test", + Node: &core.Node{Id: "proxy"}, + } + st, err := conversion.MessageToStruct(pb) + if err != nil { + b.Fatalf("unexpected error %v", err) + } + pbst := map[string]*pstruct.Value{ + "version_info": &pstruct.Value{Kind: &pstruct.Value_StringValue{StringValue: "test"}}, + "node": &pstruct.Value{Kind: &pstruct.Value_StructValue{StructValue: &pstruct.Struct{ + Fields: map[string]*pstruct.Value{ + "id": &pstruct.Value{Kind: &pstruct.Value_StringValue{StringValue: "proxy"}}, + }, + }}}, + } + if !cmp.Equal(st.Fields, pbst, cmp.Comparer(proto.Equal)) { + b.Errorf("MessageToStruct(%v) => got %v, want %v", pb, st.Fields, pbst) + } + + out := &v2.DiscoveryRequest{} + err = conversion.StructToMessage(st, out) + if err != nil { + b.Fatalf("unexpected error %v", err) + } + if !cmp.Equal(pb, out, cmp.Comparer(proto.Equal)) { + b.Errorf("StructToMessage(%v) => got %v, want %v", st, out, pb) + } + + if _, err = conversion.MessageToStruct(nil); err == nil { + b.Error("MessageToStruct(nil) => got no error") + } + + if err = conversion.StructToMessage(nil, &v2.DiscoveryRequest{}); err == nil { + b.Error("StructToMessage(nil) => got no error") + } +} diff --git a/pkg/server/v2/server_test.go b/pkg/server/v2/server_test.go index b8f12bf7b9..edc1b25f53 100644 --- a/pkg/server/v2/server_test.go +++ b/pkg/server/v2/server_test.go @@ -651,3 +651,211 @@ func TestCallbackError(t *testing.T) { }) } } + +// BENCHMARKS ===================================================================================================== + +func BenchmarkResponseHandlers(b *testing.B) { + for _, typ := range testTypes { + b.Run(typ, func(b *testing.B) { + config := makeMockConfigWatcher() + config.responses = makeResponses() + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + + // make a request + resp := makeMockStream(&testing.T{}) + resp.recv <- &discovery.DiscoveryRequest{Node: node, TypeUrl: typ} + go func() { + var err error + switch typ { + case rsrc.EndpointType: + err = s.StreamEndpoints(resp) + case rsrc.ClusterType: + err = s.StreamClusters(resp) + case rsrc.RouteType: + err = s.StreamRoutes(resp) + case rsrc.ListenerType: + err = s.StreamListeners(resp) + case rsrc.SecretType: + err = s.StreamSecrets(resp) + case rsrc.RuntimeType: + err = s.StreamRuntime(resp) + case opaqueType: + err = s.StreamAggregatedResources(resp) + } + if err != nil { + b.Errorf("Stream() => got %v, want no error", err) + } + }() + + // check a response + select { + case <-resp.sent: + close(resp.recv) + if want := map[string]int{typ: 1}; !reflect.DeepEqual(want, config.counts) { + b.Errorf("watch counts => got %v, want %v", config.counts, want) + } + case <-time.After(1 * time.Second): + b.Fatalf("got no response") + } + }) + } +} + +func BenchmarkAggregatedHandlers(b *testing.B) { + config := makeMockConfigWatcher() + config.responses = makeResponses() + resp := makeMockStream(&testing.T{}) + + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ListenerType, + } + // Delta compress node + resp.recv <- &discovery.DiscoveryRequest{ + TypeUrl: rsrc.ClusterType, + } + resp.recv <- &discovery.DiscoveryRequest{ + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + resp.recv <- &discovery.DiscoveryRequest{ + TypeUrl: rsrc.RouteType, + ResourceNames: []string{routeName}, + } + + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + if err := s.StreamAggregatedResources(resp); err != nil { + b.Errorf("StreamAggregatedResources() => got %v, want no error", err) + } + }() + + count := 0 + for { + select { + case <-resp.sent: + count++ + if count >= 4 { + close(resp.recv) + if want := map[string]int{ + rsrc.EndpointType: 1, + rsrc.ClusterType: 1, + rsrc.RouteType: 1, + rsrc.ListenerType: 1, + }; !reflect.DeepEqual(want, config.counts) { + b.Errorf("watch counts => got %v, want %v", config.counts, want) + } + + // got all messages + return + } + case <-time.After(1 * time.Second): + b.Fatalf("got %d messages on the stream, not 4", count) + } + } +} + +func BenchmarkFetch(b *testing.B) { + config := makeMockConfigWatcher() + config.responses = makeResponses() + + requestCount := 0 + responseCount := 0 + callbackError := false + + cb := server.CallbackFuncs{ + StreamOpenFunc: func(ctx context.Context, i int64, s string) error { + if callbackError { + return errors.New("stream open error") + } + return nil + }, + FetchRequestFunc: func(ctx context.Context, request *discovery.DiscoveryRequest) error { + if callbackError { + return errors.New("fetch request error") + } + requestCount++ + return nil + }, + FetchResponseFunc: func(request *discovery.DiscoveryRequest, response *discovery.DiscoveryResponse) { + responseCount++ + }, + } + + s := server.NewServer(context.Background(), config, cb) + if out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for endpoints: %v", err) + } + if out, err := s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for clusters: %v", err) + } + if out, err := s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for routes: %v", err) + } + if out, err := s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for listeners: %v", err) + } + if out, err := s.FetchSecrets(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for listeners: %v", err) + } + if out, err := s.FetchRuntime(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for listeners: %v", err) + } + + // try again and expect empty results + if out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for endpoints: %v", err) + } + if out, err := s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for clusters: %v", err) + } + if out, err := s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for routes: %v", err) + } + if out, err := s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for listeners: %v", err) + } + + // try empty requests: not valid in a real gRPC server + if out, err := s.FetchEndpoints(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchClusters(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchRoutes(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchListeners(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchSecrets(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchRuntime(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + + // send error from callback + callbackError = true + if out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + if out, err := s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + if out, err := s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + if out, err := s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + + // verify fetch callbacks + if want := 10; requestCount != want { + b.Errorf("unexpected number of fetch requests: got %d, want %d", requestCount, want) + } + if want := 6; responseCount != want { + b.Errorf("unexpected number of fetch responses: got %d, want %d", responseCount, want) + } +} diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 6a4b74b0ce..228d369e6c 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -652,3 +652,211 @@ func TestCallbackError(t *testing.T) { }) } } + +// BENCHMARKS ===================================================================================================== + +func BenchmarkResponseHandlers(b *testing.B) { + for _, typ := range testTypes { + b.Run(typ, func(b *testing.B) { + config := makeMockConfigWatcher() + config.responses = makeResponses() + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + + // make a request + resp := makeMockStream(&testing.T{}) + resp.recv <- &discovery.DiscoveryRequest{Node: node, TypeUrl: typ} + go func() { + var err error + switch typ { + case rsrc.EndpointType: + err = s.StreamEndpoints(resp) + case rsrc.ClusterType: + err = s.StreamClusters(resp) + case rsrc.RouteType: + err = s.StreamRoutes(resp) + case rsrc.ListenerType: + err = s.StreamListeners(resp) + case rsrc.SecretType: + err = s.StreamSecrets(resp) + case rsrc.RuntimeType: + err = s.StreamRuntime(resp) + case opaqueType: + err = s.StreamAggregatedResources(resp) + } + if err != nil { + b.Errorf("Stream() => got %v, want no error", err) + } + }() + + // check a response + select { + case <-resp.sent: + close(resp.recv) + if want := map[string]int{typ: 1}; !reflect.DeepEqual(want, config.counts) { + b.Errorf("watch counts => got %v, want %v", config.counts, want) + } + case <-time.After(1 * time.Second): + b.Fatalf("got no response") + } + }) + } +} + +func BenchmarkAggregatedHandlers(b *testing.B) { + config := makeMockConfigWatcher() + config.responses = makeResponses() + resp := makeMockStream(&testing.T{}) + + resp.recv <- &discovery.DiscoveryRequest{ + Node: node, + TypeUrl: rsrc.ListenerType, + } + // Delta compress node + resp.recv <- &discovery.DiscoveryRequest{ + TypeUrl: rsrc.ClusterType, + } + resp.recv <- &discovery.DiscoveryRequest{ + TypeUrl: rsrc.EndpointType, + ResourceNames: []string{clusterName}, + } + resp.recv <- &discovery.DiscoveryRequest{ + TypeUrl: rsrc.RouteType, + ResourceNames: []string{routeName}, + } + + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + if err := s.StreamAggregatedResources(resp); err != nil { + b.Errorf("StreamAggregatedResources() => got %v, want no error", err) + } + }() + + count := 0 + for { + select { + case <-resp.sent: + count++ + if count >= 4 { + close(resp.recv) + if want := map[string]int{ + rsrc.EndpointType: 1, + rsrc.ClusterType: 1, + rsrc.RouteType: 1, + rsrc.ListenerType: 1, + }; !reflect.DeepEqual(want, config.counts) { + b.Errorf("watch counts => got %v, want %v", config.counts, want) + } + + // got all messages + return + } + case <-time.After(1 * time.Second): + b.Fatalf("got %d messages on the stream, not 4", count) + } + } +} + +func BenchmarkFetch(b *testing.B) { + config := makeMockConfigWatcher() + config.responses = makeResponses() + + requestCount := 0 + responseCount := 0 + callbackError := false + + cb := server.CallbackFuncs{ + StreamOpenFunc: func(ctx context.Context, i int64, s string) error { + if callbackError { + return errors.New("stream open error") + } + return nil + }, + FetchRequestFunc: func(ctx context.Context, request *discovery.DiscoveryRequest) error { + if callbackError { + return errors.New("fetch request error") + } + requestCount++ + return nil + }, + FetchResponseFunc: func(request *discovery.DiscoveryRequest, response *discovery.DiscoveryResponse) { + responseCount++ + }, + } + + s := server.NewServer(context.Background(), config, cb) + if out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for endpoints: %v", err) + } + if out, err := s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for clusters: %v", err) + } + if out, err := s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for routes: %v", err) + } + if out, err := s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for listeners: %v", err) + } + if out, err := s.FetchSecrets(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for listeners: %v", err) + } + if out, err := s.FetchRuntime(context.Background(), &discovery.DiscoveryRequest{Node: node}); out == nil || err != nil { + b.Errorf("unexpected empty or error for listeners: %v", err) + } + + // try again and expect empty results + if out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for endpoints: %v", err) + } + if out, err := s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for clusters: %v", err) + } + if out, err := s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for routes: %v", err) + } + if out, err := s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil { + b.Errorf("expected empty or error for listeners: %v", err) + } + + // try empty requests: not valid in a real gRPC server + if out, err := s.FetchEndpoints(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchClusters(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchRoutes(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchListeners(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchSecrets(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + if out, err := s.FetchRuntime(context.Background(), nil); out != nil { + b.Errorf("expected empty on empty request: %v", err) + } + + // send error from callback + callbackError = true + if out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + if out, err := s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + if out, err := s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + if out, err := s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}); out != nil || err == nil { + b.Errorf("expected empty or error due to callback error") + } + + // verify fetch callbacks + if want := 10; requestCount != want { + b.Errorf("unexpected number of fetch requests: got %d, want %d", requestCount, want) + } + if want := 6; responseCount != want { + b.Errorf("unexpected number of fetch responses: got %d, want %d", responseCount, want) + } +}