Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataplane: Allow getting bootstrap parameters when using V2 APIs #18504

Merged
merged 7 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18504.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
dataplane: Allow getting bootstrap parameters when using V2 APIs
```
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ issues:
- linters: [staticcheck]
text: 'SA1019: "io/ioutil" has been deprecated since Go 1.16'

# Allow usage of deprecated values.
- linters: [ staticcheck ]
text: 'SA1019:'
path: "(agent/grpc-external)"

# An argument that always receives the same value is often not a problem.
- linters: [unparam]
text: "always receives"
Expand Down
33 changes: 18 additions & 15 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,17 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
s.reportingManager = reporting.NewReportingManager(s.logger, getEnterpriseReportingDeps(flat), s, s.fsm.State())
go s.reportingManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

// Setup resource service clients.
if err := s.setupSecureResourceServiceClient(); err != nil {
return nil, err
}

if err := s.setupInsecureResourceServiceClient(flat.Registry, logger); err != nil {
return nil, err
}

// Initialize external gRPC server
s.setupExternalGRPC(config, flat.Registry, logger)
s.setupExternalGRPC(config, flat, logger)

// Initialize internal gRPC server.
//
Expand All @@ -826,14 +835,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder

if err := s.setupSecureResourceServiceClient(); err != nil {
return nil, err
}

if err := s.setupInsecureResourceServiceClient(flat.Registry, logger); err != nil {
return nil, err
}

s.controllerManager = controller.NewManager(
s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime),
Expand Down Expand Up @@ -1309,7 +1310,7 @@ func (s *Server) setupRPC() error {
}

// Initialize and register services on external gRPC server.
func (s *Server) setupExternalGRPC(config *Config, typeRegistry resource.Registry, logger hclog.Logger) {
func (s *Server) setupExternalGRPC(config *Config, deps Deps, logger hclog.Logger) {
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
Expand Down Expand Up @@ -1342,10 +1343,12 @@ func (s *Server) setupExternalGRPC(config *Config, typeRegistry resource.Registr
s.externalConnectCAServer.Register(s.externalGRPCServer)

dataplane.NewServer(dataplane.Config{
GetStore: func() dataplane.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.dataplane"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
GetStore: func() dataplane.StateStore { return s.FSM().State() },
Logger: logger.Named("grpc-api.dataplane"),
ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter,
EnableV2: stringslice.Contains(deps.Experiments, CatalogResourceExperimentName),
ResourceAPIClient: s.insecureResourceServiceClient,
}).Register(s.externalGRPCServer)

serverdiscovery.NewServer(serverdiscovery.Config{
Expand Down Expand Up @@ -1375,7 +1378,7 @@ func (s *Server) setupExternalGRPC(config *Config, typeRegistry resource.Registr
s.peerStreamServer.Register(s.externalGRPCServer)

s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{
Registry: typeRegistry,
Registry: deps.Registry,
Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"),
Expand Down
169 changes: 131 additions & 38 deletions agent/grpc-external/services/dataplane/get_envoy_bootstrap_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ import (
"errors"
"strings"

"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/mesh"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state"
Expand All @@ -23,7 +31,11 @@ import (
)

func (s *Server) GetEnvoyBootstrapParams(ctx context.Context, req *pbdataplane.GetEnvoyBootstrapParamsRequest) (*pbdataplane.GetEnvoyBootstrapParamsResponse, error) {
logger := s.Logger.Named("get-envoy-bootstrap-params").With("service_id", req.GetServiceId(), "request_id", external.TraceID())
proxyID := req.ProxyId
if req.GetServiceId() != "" {
proxyID = req.GetServiceId()
}
logger := s.Logger.Named("get-envoy-bootstrap-params").With("proxy_id", proxyID, "request_id", external.TraceID())

logger.Trace("Started processing request")
defer logger.Trace("Finished processing request")
Expand All @@ -40,9 +52,84 @@ func (s *Server) GetEnvoyBootstrapParams(ctx context.Context, req *pbdataplane.G
return nil, status.Error(codes.Unauthenticated, err.Error())
}

if s.EnableV2 {
// Get the workload.
workloadId := &pbresource.ID{
Name: proxyID,
Tenancy: &pbresource.Tenancy{
Namespace: req.Namespace,
Partition: req.Partition,
PeerName: "local",
},
Type: catalog.WorkloadType,
}
workloadRsp, err := s.ResourceAPIClient.Read(ctx, &pbresource.ReadRequest{
Id: workloadId,
})
if err != nil {
// This error should already include the gRPC status code and so we don't need to wrap it
// in status.Error.
return nil, err
}
var workload pbcatalog.Workload
err = workloadRsp.Resource.Data.UnmarshalTo(&workload)
if err != nil {
return nil, status.Error(codes.Internal, "failed to parse workload data")
}

// Only workloads that have an associated identity can ask for proxy bootstrap parameters.
if workload.Identity == "" {
return nil, status.Errorf(codes.InvalidArgument, "workload %q doesn't have identity associated with it", req.ProxyId)
}

// todo (ishustava): ACL enforcement ensuring there's identity:write permissions.

// Get all proxy configurations for this workload. Currently we're only looking
// for proxy configurations in the same tenancy as the workload.
// todo (ishustava): we need to support wildcard proxy configurations as well.

proxyCfgList, err := s.ResourceAPIClient.List(ctx, &pbresource.ListRequest{
Tenancy: workloadRsp.Resource.Id.GetTenancy(),
Type: mesh.ProxyConfigurationType,
})
if err != nil {
return nil, err
}

// Collect and merge proxy configs.
// todo (ishustava): sorting and conflict resolution.
bootstrapCfg := &pbmesh.BootstrapConfig{}
dynamicCfg := &pbmesh.DynamicConfig{}
for _, cfgResource := range proxyCfgList.Resources {
var proxyCfg pbmesh.ProxyConfiguration
err = cfgResource.Data.UnmarshalTo(&proxyCfg)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to parse proxy configuration data: %q", cfgResource.Id.Name)
}
if isWorkloadSelected(req.ProxyId, proxyCfg.Workloads) {
proto.Merge(bootstrapCfg, proxyCfg.BootstrapConfig)
proto.Merge(dynamicCfg, proxyCfg.DynamicConfig)
}
}

accessLogs := makeAccessLogs(dynamicCfg.GetAccessLogs(), logger)

return &pbdataplane.GetEnvoyBootstrapParamsResponse{
Identity: workload.Identity,
Partition: workloadRsp.Resource.Id.Tenancy.Partition,
Namespace: workloadRsp.Resource.Id.Tenancy.Namespace,
BootstrapConfig: bootstrapCfg,
Datacenter: s.Datacenter,
NodeName: workload.NodeName,
AccessLogs: accessLogs,
}, nil
}

// The remainder of this file focuses on v1 implementation of this endpoint.

store := s.GetStore()

_, svc, err := store.ServiceNode(req.GetNodeId(), req.GetNodeName(), req.GetServiceId(), &entMeta, structs.DefaultPeerKeyword)
_, svc, err := store.ServiceNode(req.GetNodeId(), req.GetNodeName(), proxyID, &entMeta, structs.DefaultPeerKeyword)
if err != nil {
logger.Error("Error looking up service", "error", err)
if errors.Is(err, state.ErrNodeNotFound) {
Expand Down Expand Up @@ -81,8 +168,34 @@ func (s *Server) GetEnvoyBootstrapParams(ctx context.Context, req *pbdataplane.G
// Inspect access logging
// This is non-essential, and don't want to return an error unless there is a more serious issue
var accessLogs []string
if ns != nil && ns.Proxy.AccessLogs.Enabled {
envoyLoggers, err := accesslogs.MakeAccessLogs(&ns.Proxy.AccessLogs, false)
if ns != nil {
accessLogs = makeAccessLogs(&ns.Proxy.AccessLogs, logger)
}

// Build out the response
var serviceName string
if svc.ServiceKind == structs.ServiceKindConnectProxy {
serviceName = svc.ServiceProxy.DestinationServiceName
} else {
serviceName = svc.ServiceName
}

return &pbdataplane.GetEnvoyBootstrapParamsResponse{
Identity: serviceName,
Service: serviceName,
Partition: svc.EnterpriseMeta.PartitionOrDefault(),
Namespace: svc.EnterpriseMeta.NamespaceOrDefault(),
Config: bootstrapConfig,
Datacenter: s.Datacenter,
NodeName: svc.Node,
AccessLogs: accessLogs,
}, nil
}

func makeAccessLogs(logs structs.AccessLogs, logger hclog.Logger) []string {
var accessLogs []string
if logs.GetEnabled() {
envoyLoggers, err := accesslogs.MakeAccessLogs(logs, false)
if err != nil {
logger.Warn("Error creating the envoy access log config", "error", err)
}
Expand All @@ -98,41 +211,21 @@ func (s *Server) GetEnvoyBootstrapParams(ctx context.Context, req *pbdataplane.G
}
}

// Build out the response
var serviceName string
if svc.ServiceKind == structs.ServiceKindConnectProxy {
serviceName = svc.ServiceProxy.DestinationServiceName
} else {
serviceName = svc.ServiceName
}

return &pbdataplane.GetEnvoyBootstrapParamsResponse{
Service: serviceName,
Partition: svc.EnterpriseMeta.PartitionOrDefault(),
Namespace: svc.EnterpriseMeta.NamespaceOrDefault(),
Config: bootstrapConfig,
Datacenter: s.Datacenter,
ServiceKind: convertToResponseServiceKind(svc.ServiceKind),
NodeName: svc.Node,
NodeId: string(svc.ID),
AccessLogs: accessLogs,
}, nil
return accessLogs
}

func convertToResponseServiceKind(serviceKind structs.ServiceKind) (respKind pbdataplane.ServiceKind) {
switch serviceKind {
case structs.ServiceKindConnectProxy:
respKind = pbdataplane.ServiceKind_SERVICE_KIND_CONNECT_PROXY
case structs.ServiceKindMeshGateway:
respKind = pbdataplane.ServiceKind_SERVICE_KIND_MESH_GATEWAY
case structs.ServiceKindTerminatingGateway:
respKind = pbdataplane.ServiceKind_SERVICE_KIND_TERMINATING_GATEWAY
case structs.ServiceKindIngressGateway:
respKind = pbdataplane.ServiceKind_SERVICE_KIND_INGRESS_GATEWAY
case structs.ServiceKindAPIGateway:
respKind = pbdataplane.ServiceKind_SERVICE_KIND_API_GATEWAY
case structs.ServiceKindTypical:
respKind = pbdataplane.ServiceKind_SERVICE_KIND_TYPICAL
func isWorkloadSelected(name string, selector *pbcatalog.WorkloadSelector) bool {
for _, prefix := range selector.Prefixes {
if strings.Contains(name, prefix) {
return true
}
}

for _, selectorName := range selector.Names {
if name == selectorName {
return true
}
}
return

return false
}
Loading