Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Process telemetry labels from CDS in xDS Balancers #7116

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions internal/stats/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package stats provides internal stats related functionality.
package stats

import "context"

// Labels are the labels for metrics
type Labels struct {
// TelemetryLabels are the telemetry labels to record.
TelemetryLabels map[string]string
}

type labelsKey struct{}

// GetLabels returns the Labels stored in theo context, or nil if there is one
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

theo* and end sentence with punctuation pls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for both.

func GetLabels(ctx context.Context) *Labels {
labels, _ := ctx.Value(labelsKey{}).(*Labels)
return labels
}

// SetLabels sets the Labels
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please finish the sentence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in the context.

func SetLabels(ctx context.Context, labels *Labels) context.Context {
// could also append
return context.WithValue(ctx, labelsKey{}, labels)
}
141 changes: 141 additions & 0 deletions test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test

import (
"context"
"fmt"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please group pbs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, this slipped through. Switched.

"google.golang.org/grpc/stats"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"google.golang.org/protobuf/types/known/structpb"
)

const serviceNameKey = "service_name"
const serviceNamespaceKey = "service_namespace"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

// TestTelemetryLabels tests that telemetry labels from CDS make their way to
// the stats handler. The stats handler sets the mutable context value that the
// cluster impl picker will write telemetry labels to, and then the stats
// handler asserts that subsequent HandleRPC calls from the RPC lifecycle
// contain telemetry labels that it can see.
func (s) TestTelemetryLabels(t *testing.T) {
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cleanup1/cleanup/ please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, switched.

defer cleanup1()

server := stubserver.StartTestService(t, nil)
defer server.Stop()

const xdsServiceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: xdsServiceName,
NodeID: nodeID,
Host: "localhost",
Port: testutils.ParsePort(t, server.Address),
SecLevel: e2e.SecurityLevelNone,
})

resources.Clusters[0].Metadata = &v3corepb.Metadata{
FilterMetadata: map[string]*structpb.Struct{
"com.google.csm.telemetry_labels": {
Fields: map[string]*structpb.Value{
serviceNameKey: structpb.NewStringValue(serviceNameValue),
serviceNamespaceKey: structpb.NewStringValue(serviceNamespaceValue),
},
},
},
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

fsh := &fakeStatsHandler{
t: t,
}

cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", xdsServiceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver), grpc.WithStatsHandler(fsh))
if err != nil {
t.Fatalf("failed to create a new client to local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}

type fakeStatsHandler struct {
labels *istats.Labels

t *testing.T
}

func (fsh *fakeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

func (fsh *fakeStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

func (fsh *fakeStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
labels := &istats.Labels{
TelemetryLabels: make(map[string]string),
}
fsh.labels = labels
ctx = istats.SetLabels(ctx, labels) // ctx passed is immutable, however cluster_impl writes to the map of Telemetry Labels on the heap.
return ctx
}

func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
switch rs.(type) {
// stats.Begin won't get Telemetry Labels because happens after picker
// picks.

// These three stats callouts trigger all metrics for OpenTelemetry that
// aren't started. All of these should have access to the desired telemetry
// labels.
Comment on lines +125 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean to test all 3, you will need a continue on the cases. Or case a, b, c:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops switched.

case *stats.OutPayload:
case *stats.InPayload:
case *stats.End:
if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label)
}
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label)
}

default:
// Nothing to assert for the other stats.Handler callouts.
}

}
2 changes: 2 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
}
dm.OutlierDetection = odJSON

dm.TelemetryLabels = cluster.TelemetryLabels

return append(dms, dm), true, nil
}

Expand Down
6 changes: 4 additions & 2 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type clusterImplBalancer struct {
requestCounterService string // The service name for the request counter.
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
telemetryLabels map[string]string
pickerUpdateCh *buffer.Unbounded
}

Expand Down Expand Up @@ -469,14 +470,15 @@ func (b *clusterImplBalancer) run() {
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}, b.loadWrapper),
}, b.loadWrapper, b.telemetryLabels),
})
case *LBConfig:
b.telemetryLabels = u.TelemetryLabels
dc := b.handleDropAndRequestCount(u)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: newPicker(b.childState, dc, b.loadWrapper),
Picker: newPicker(b.childState, dc, b.loadWrapper, b.telemetryLabels),
})
}
}
Expand Down
10 changes: 6 additions & 4 deletions xds/internal/balancer/clusterimpl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ type LBConfig struct {
EDSServiceName string `json:"edsServiceName,omitempty"`
// LoadReportingServer is the LRS server to send load reports to. If not
// present, load reporting will be disabled.
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
// TelemetryLabels are the telemetry Labels associated with this cluster.
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
}

func parseConfig(c json.RawMessage) (*LBConfig, error) {
Expand Down
33 changes: 22 additions & 11 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
Expand Down Expand Up @@ -78,24 +79,34 @@ type loadReporter interface {

// Picker implements RPC drop, circuit breaking drop and load reporting.
type picker struct {
drops []*dropper
s balancer.State
loadStore loadReporter
counter *xdsclient.ClusterRequestsCounter
countMax uint32
drops []*dropper
s balancer.State
loadStore loadReporter
counter *xdsclient.ClusterRequestsCounter
countMax uint32
telemetryLabels map[string]string
}

func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker {
func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter, telemetryLabels map[string]string) *picker {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we make this a method on the balancer instead of adding more parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done.

return &picker{
drops: config.drops,
s: s,
loadStore: loadStore,
counter: config.requestCounter,
countMax: config.requestCountMax,
drops: config.drops,
s: s,
loadStore: loadStore,
counter: config.requestCounter,
countMax: config.requestCountMax,
telemetryLabels: telemetryLabels,
}
}

func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if info.Ctx != nil {
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
for key, value := range d.telemetryLabels {
labels.TelemetryLabels[key] = value
}
} // Unconditionally set, even dropped or queued RPC's can use this label.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to above if info.Ctx != nil or just inside the open brace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added above the info.Ctx != nil check, the comment turned out to be: Unconditionally set labels if present, even dropped or queued RPC's can use these labels.

}

// Don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
if d.s.ConnectivityState == connectivity.Ready {
Expand Down
2 changes: 2 additions & 0 deletions xds/internal/balancer/clusterresolver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type DiscoveryMechanism struct {
// OutlierDetection is the Outlier Detection LB configuration for this
// priority.
OutlierDetection json.RawMessage `json:"outlierDetection,omitempty"`
// TelemetryLabels are the telemetry labels associated with this cluster.
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
outlierDetection outlierdetection.LBConfig
}

Expand Down
6 changes: 4 additions & 2 deletions xds/internal/balancer/clusterresolver/configbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism
retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
}
return pName, &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
Cluster: mechanism.Cluster,
TelemetryLabels: mechanism.TelemetryLabels,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
}, retAddrs
}

Expand Down Expand Up @@ -283,6 +284,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
EDSServiceName: mechanism.EDSServiceName,
LoadReportingServer: mechanism.LoadReportingServer,
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
TelemetryLabels: mechanism.TelemetryLabels,
DropCategories: drops,
ChildPolicy: xdsLBPolicy,
}, addrs, nil
Expand Down
Loading