Skip to content

Commit

Permalink
xds/csds: populate new GenericXdsConfig field (#4898)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Oct 26, 2021
1 parent 6e8625d commit 9fa2698
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 434 deletions.
148 changes: 29 additions & 119 deletions xds/csds/csds.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package csds
import (
"context"
"io"
"time"

v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand Down Expand Up @@ -56,6 +55,13 @@ var (
}
)

const (
listenerTypeURL = "envoy.config.listener.v3.Listener"
routeConfigTypeURL = "envoy.config.route.v3.RouteConfiguration"
clusterTypeURL = "envoy.config.cluster.v3.Cluster"
endpointsTypeURL = "envoy.config.endpoint.v3.ClusterLoadAssignment"
)

// ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer.
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practice. But we keep a copy in each
Expand Down Expand Up @@ -108,16 +114,21 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp
return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers)
}

lds := dumpToGenericXdsConfig(listenerTypeURL, s.xdsClient.DumpLDS)
rds := dumpToGenericXdsConfig(routeConfigTypeURL, s.xdsClient.DumpRDS)
cds := dumpToGenericXdsConfig(clusterTypeURL, s.xdsClient.DumpCDS)
eds := dumpToGenericXdsConfig(endpointsTypeURL, s.xdsClient.DumpEDS)
configs := make([]*v3statuspb.ClientConfig_GenericXdsConfig, 0, len(lds)+len(rds)+len(cds)+len(eds))
configs = append(configs, lds...)
configs = append(configs, rds...)
configs = append(configs, cds...)
configs = append(configs, eds...)

ret := &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: nodeProtoToV3(s.xdsClient.BootstrapConfig().NodeProto),
XdsConfig: []*v3statuspb.PerXdsConfig{
s.buildLDSPerXDSConfig(),
s.buildRDSPerXDSConfig(),
s.buildCDSPerXDSConfig(),
s.buildEDSPerXDSConfig(),
},
Node: nodeProtoToV3(s.xdsClient.BootstrapConfig().NodeProto),
GenericXdsConfigs: configs,
},
},
}
Expand Down Expand Up @@ -162,129 +173,28 @@ func nodeProtoToV3(n proto.Message) *v3corepb.Node {
return node
}

func (s *ClientStatusDiscoveryServer) buildLDSPerXDSConfig() *v3statuspb.PerXdsConfig {
version, dump := s.xdsClient.DumpLDS()
resources := make([]*v3adminpb.ListenersConfigDump_DynamicListener, 0, len(dump))
func dumpToGenericXdsConfig(typeURL string, dumpF func() (string, map[string]xdsclient.UpdateWithMD)) []*v3statuspb.ClientConfig_GenericXdsConfig {
_, dump := dumpF()
ret := make([]*v3statuspb.ClientConfig_GenericXdsConfig, 0, len(dump))
for name, d := range dump {
configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
config := &v3statuspb.ClientConfig_GenericXdsConfig{
TypeUrl: typeURL,
Name: name,
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.ActiveState = &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: d.MD.Version,
Listener: d.Raw,
LastUpdated: timestamppb.New(d.MD.Timestamp),
}
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_ListenerConfig{
ListenerConfig: &v3adminpb.ListenersConfigDump{
VersionInfo: version,
DynamicListeners: resources,
},
},
}
}

func (s *ClientStatusDiscoveryServer) buildRDSPerXDSConfig() *v3statuspb.PerXdsConfig {
_, dump := s.xdsClient.DumpRDS()
resources := make([]*v3adminpb.RoutesConfigDump_DynamicRouteConfig, 0, len(dump))
for _, d := range dump {
configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: d.MD.Version,
XdsConfig: d.Raw,
LastUpdated: timestamppb.New(d.MD.Timestamp),
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.RouteConfig = d.Raw
configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
config.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_RouteConfig{
RouteConfig: &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: resources,
},
},
}
}

func (s *ClientStatusDiscoveryServer) buildCDSPerXDSConfig() *v3statuspb.PerXdsConfig {
version, dump := s.xdsClient.DumpCDS()
resources := make([]*v3adminpb.ClustersConfigDump_DynamicCluster, 0, len(dump))
for _, d := range dump {
configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: d.MD.Version,
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.Cluster = d.Raw
configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_ClusterConfig{
ClusterConfig: &v3adminpb.ClustersConfigDump{
VersionInfo: version,
DynamicActiveClusters: resources,
},
},
}
}

func (s *ClientStatusDiscoveryServer) buildEDSPerXDSConfig() *v3statuspb.PerXdsConfig {
_, dump := s.xdsClient.DumpEDS()
resources := make([]*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig, 0, len(dump))
for _, d := range dump {
configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: d.MD.Version,
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.EndpointConfig = d.Raw
configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_EndpointConfig{
EndpointConfig: &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: resources,
},
},
ret = append(ret, config)
}
return ret
}

func serviceStatusToProto(serviceStatus xdsclient.ServiceStatus) v3adminpb.ClientResourceStatus {
Expand Down
Loading

0 comments on commit 9fa2698

Please sign in to comment.