diff --git a/agent/connect/uri_service.go b/agent/connect/uri_service.go index 5f5f5c9bc8a99..e191151346494 100644 --- a/agent/connect/uri_service.go +++ b/agent/connect/uri_service.go @@ -8,6 +8,7 @@ import ( "net/url" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/proto-public/pbresource" ) // SpiffeIDService is the structure to represent the SPIFFE ID for a service. @@ -52,3 +53,32 @@ func (id SpiffeIDService) uriPath() string { } return path } + +// SpiffeIDService is the structure to represent the SPIFFE ID for a service. +type SpiffeIDIdentity struct { + Host string + Partition string + Namespace string + Identity string +} + +func (id SpiffeIDIdentity) URI() *url.URL { + var result url.URL + result.Scheme = "spiffe" + result.Host = id.Host + result.Path = fmt.Sprintf("/ap/%s/ns/%s/identity/%s", + id.Partition, + id.Namespace, + id.Identity, + ) + return &result +} + +func SpiffeIDFromIdentityRef(trustDomain string, ref *pbresource.Reference) string { + return SpiffeIDIdentity{ + Host: trustDomain, + Partition: ref.Tenancy.Partition, + Namespace: ref.Tenancy.Namespace, + Identity: ref.Name, + }.URI().String() +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 913b08b25d0d5..02df6d08f9332 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/internal/mesh" "github.com/hashicorp/consul/internal/resource" @@ -876,7 +877,24 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom func (s *Server) registerControllers(deps Deps) { if stringslice.Contains(deps.Experiments, catalogResourceExperimentName) { catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) - mesh.RegisterControllers(s.controllerManager) + mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{ + TrustDomainFetcher: func() (string, error) { + if s.config.CAConfig == nil || s.config.CAConfig.ClusterID == "" { + return "", fmt.Errorf("CA has not finished initializing") + } + + // Build TrustDomain based on the ClusterID stored. + signingID := connect.SpiffeIDSigningForCluster(s.config.CAConfig.ClusterID) + if signingID == nil { + // If CA is bootstrapped at all then this should never happen but be + // defensive. + return "", fmt.Errorf("no cluster trust domain setup") + } + + return signingID.Host(), nil + }, + }) + connect.SpiffeIDSigningForCluster(s.config.CAConfig.ClusterID) } reaper.RegisterControllers(s.controllerManager) diff --git a/internal/mesh/exports.go b/internal/mesh/exports.go index 5638daf32b297..7525ba3abe768 100644 --- a/internal/mesh/exports.go +++ b/internal/mesh/exports.go @@ -6,6 +6,7 @@ package mesh import ( "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/controllers" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh" "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/resource" ) @@ -29,6 +30,13 @@ var ( UpstreamsV1Alpha1Type = types.UpstreamsV1Alpha1Type UpstreamsConfigurationV1Alpha1Type = types.UpstreamsConfigurationV1Alpha1Type ProxyStateTemplateV1Alpha1Type = types.ProxyStateTemplateV1Alpha1Type + + // Resource Types for the latest version. + + ProxyConfigurationType = types.ProxyConfigurationV1Alpha1Type + UpstreamsType = types.UpstreamsV1Alpha1Type + UpstreamsConfigurationType = types.UpstreamsConfigurationV1Alpha1Type + ProxyStateTemplateType = types.ProxyStateTemplateV1Alpha1Type ) // RegisterTypes adds all resource types within the "mesh" API group @@ -39,6 +47,10 @@ func RegisterTypes(r resource.Registry) { // RegisterControllers registers controllers for the mesh types with // the given controller Manager. -func RegisterControllers(mgr *controller.Manager) { - controllers.Register(mgr) +func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) { + controllers.Register(mgr, deps) } + +type TrustDomainFetcher = mesh.TrustDomainFetcher + +type ControllerDependencies = controllers.Dependencies diff --git a/internal/mesh/internal/controllers/mesh/builder/builder.go b/internal/mesh/internal/controllers/mesh/builder/builder.go index b367ef9f84fc5..d675dbec7d6dd 100644 --- a/internal/mesh/internal/controllers/mesh/builder/builder.go +++ b/internal/mesh/internal/controllers/mesh/builder/builder.go @@ -1,169 +1,42 @@ package builder import ( - "fmt" - - "github.com/hashicorp/consul/envoyextensions/xdscommon" - pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbresource" ) +// Builder builds a ProxyStateTemplate. type Builder struct { id *pbresource.ID proxyStateTemplate *pbmesh.ProxyStateTemplate - - lastBuiltListener lastListenerData + trustDomain string } -type lastListenerData struct { - index int -} - -func New(id *pbresource.ID, identity *pbresource.Reference) *Builder { +func New(id *pbresource.ID, identity *pbresource.Reference, trustDomain string) *Builder { return &Builder{ - id: id, + id: id, + trustDomain: trustDomain, proxyStateTemplate: &pbmesh.ProxyStateTemplate{ ProxyState: &pbmesh.ProxyState{ Identity: identity, - Clusters: make(map[string]*pbmesh.Cluster), - Endpoints: make(map[string]*pbmesh.Endpoints), + Clusters: make(map[string]*pbproxystate.Cluster), + Endpoints: make(map[string]*pbproxystate.Endpoints), }, - RequiredEndpoints: make(map[string]*pbmesh.EndpointRef), - RequiredLeafCertificates: make(map[string]*pbmesh.LeafCertificateRef), - RequiredTrustBundles: make(map[string]*pbmesh.TrustBundleRef), + RequiredEndpoints: make(map[string]*pbproxystate.EndpointRef), + RequiredLeafCertificates: make(map[string]*pbproxystate.LeafCertificateRef), + RequiredTrustBundles: make(map[string]*pbproxystate.TrustBundleRef), }, } } func (b *Builder) Build() *pbmesh.ProxyStateTemplate { - b.lastBuiltListener = lastListenerData{} return b.proxyStateTemplate } -func (b *Builder) AddInboundListener(name string, workload *pbcatalog.Workload) *Builder { - listener := &pbmesh.Listener{ - Name: name, - Direction: pbmesh.Direction_DIRECTION_INBOUND, - } - - // We will take listener bind port from the workload for now. - // Find mesh port. - var meshPort string - for portName, port := range workload.Ports { - if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { - meshPort = portName - break - } - } - - // Check if the workload has a specific address for the mesh port. - var meshAddress string - for _, address := range workload.Addresses { - for _, port := range address.Ports { - if port == meshPort { - meshAddress = address.Host - } - } - } - // Otherwise, assume the first address in the addresses list. - if meshAddress == "" { - // It is safe to assume that there's at least one address because we validate it when creating the workload. - meshAddress = workload.Addresses[0].Host - } - - listener.BindAddress = &pbmesh.Listener_IpPort{ - IpPort: &pbmesh.IPPortAddress{ - Ip: meshAddress, - Port: workload.Ports[meshPort].Port, - }, - } - - // Track the last added listener. - b.lastBuiltListener.index = len(b.proxyStateTemplate.ProxyState.Listeners) +func (b *Builder) addListener(l *pbproxystate.Listener) *Builder { // Add listener to proxy state template - b.proxyStateTemplate.ProxyState.Listeners = append(b.proxyStateTemplate.ProxyState.Listeners, listener) - - return b -} + b.proxyStateTemplate.ProxyState.Listeners = append(b.proxyStateTemplate.ProxyState.Listeners, l) -func (b *Builder) AddInboundRouters(workload *pbcatalog.Workload) *Builder { - listener := b.proxyStateTemplate.ProxyState.Listeners[b.lastBuiltListener.index] - - // Go through workload ports and add the first non-mesh port we see. - // todo (ishustava): Note we will need to support multiple ports in the future. - // todo (ishustava): make sure we always iterate through ports in the same order so we don't need to send more updates to envoy. - for portName, port := range workload.Ports { - clusterName := fmt.Sprintf("%s:%s", xdscommon.LocalAppClusterName, portName) - if port.Protocol == pbcatalog.Protocol_PROTOCOL_TCP { - r := &pbmesh.Router{ - Destination: &pbmesh.Router_L4{ - L4: &pbmesh.L4Destination{ - Name: clusterName, - StatPrefix: listener.Name, - }, - }, - } - listener.Routers = append(listener.Routers, r) - - // Make cluster for this router destination. - b.proxyStateTemplate.ProxyState.Clusters[clusterName] = &pbmesh.Cluster{ - Group: &pbmesh.Cluster_EndpointGroup{ - EndpointGroup: &pbmesh.EndpointGroup{ - Group: &pbmesh.EndpointGroup_Static{ - Static: &pbmesh.StaticEndpointGroup{ - Name: clusterName, - }, - }, - }, - }, - } - - // Finally, add static endpoints. We're adding it statically as opposed to creating an endpoint ref - // because this endpoint is less likely to change as we're not tracking the health. - endpoint := &pbmesh.Endpoint{ - Address: &pbmesh.Endpoint_HostPort{ - HostPort: &pbmesh.HostPortAddress{ - Host: "127.0.0.1", - Port: port.Port, - }, - }, - } - b.proxyStateTemplate.ProxyState.Endpoints[clusterName] = &pbmesh.Endpoints{ - Name: clusterName, - Endpoints: []*pbmesh.Endpoint{endpoint}, - } - break - } - } - return b -} - -func (b *Builder) AddInboundTLS() *Builder { - listener := b.proxyStateTemplate.ProxyState.Listeners[b.lastBuiltListener.index] - // For inbound TLS, we want to use this proxy's identity. - workloadIdentity := b.proxyStateTemplate.ProxyState.Identity.Name - - inboundTLS := &pbmesh.TransportSocket{ - ConnectionTls: &pbmesh.TransportSocket_InboundMesh{ - InboundMesh: &pbmesh.InboundMeshMTLS{ - IdentityKey: workloadIdentity, - ValidationContext: &pbmesh.MeshInboundValidationContext{TrustBundlePeerNameKeys: []string{b.id.Tenancy.PeerName}}, - }, - }, - } - b.proxyStateTemplate.RequiredLeafCertificates[workloadIdentity] = &pbmesh.LeafCertificateRef{ - Name: workloadIdentity, - Namespace: b.id.Tenancy.Namespace, - Partition: b.id.Tenancy.Partition, - } - - b.proxyStateTemplate.RequiredTrustBundles[b.id.Tenancy.PeerName] = &pbmesh.TrustBundleRef{ - Peer: b.id.Tenancy.PeerName, - } - - for i := range listener.Routers { - listener.Routers[i].InboundTls = inboundTLS - } return b } diff --git a/internal/mesh/internal/controllers/mesh/builder/builder_test.go b/internal/mesh/internal/controllers/mesh/builder/builder_test.go index 1f0ee988db928..5c806424b925c 100644 --- a/internal/mesh/internal/controllers/mesh/builder/builder_test.go +++ b/internal/mesh/internal/controllers/mesh/builder/builder_test.go @@ -1,238 +1,47 @@ package builder import ( + "flag" + "os" + "path/filepath" "testing" - "github.com/hashicorp/consul/internal/mesh/internal/types" - "github.com/hashicorp/consul/internal/resource/resourcetest" - pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" - pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" - "github.com/hashicorp/consul/proto-public/pbresource" - "github.com/hashicorp/consul/proto/private/prototest" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) -func TestAddInboundListener(t *testing.T) { - listenerName := "test-listener" - - cases := map[string]struct { - workload *pbcatalog.Workload - expListener *pbmesh.Listener - }{ - "single workload address without ports": { - workload: &pbcatalog.Workload{ - Addresses: []*pbcatalog.WorkloadAddress{ - { - Host: "10.0.0.1", - }, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, - "port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, - }, - }, - expListener: &pbmesh.Listener{ - Name: listenerName, - Direction: pbmesh.Direction_DIRECTION_INBOUND, - BindAddress: &pbmesh.Listener_IpPort{ - IpPort: &pbmesh.IPPortAddress{ - Ip: "10.0.0.1", - Port: 20000, - }, - }, - }, - }, - "multiple workload addresses without ports: prefer first address": { - workload: &pbcatalog.Workload{ - Addresses: []*pbcatalog.WorkloadAddress{ - { - Host: "10.0.0.1", - }, - { - Host: "10.0.0.2", - }, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, - "port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, - }, - }, - expListener: &pbmesh.Listener{ - Name: listenerName, - Direction: pbmesh.Direction_DIRECTION_INBOUND, - BindAddress: &pbmesh.Listener_IpPort{ - IpPort: &pbmesh.IPPortAddress{ - Ip: "10.0.0.1", - Port: 20000, - }, - }, - }, - }, - "multiple workload addresses with specific ports": { - workload: &pbcatalog.Workload{ - Addresses: []*pbcatalog.WorkloadAddress{ - { - Host: "127.0.0.1", - Ports: []string{"port1"}, - }, - { - Host: "10.0.0.2", - Ports: []string{"port2"}, - }, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, - "port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, - }, - }, - expListener: &pbmesh.Listener{ - Name: listenerName, - Direction: pbmesh.Direction_DIRECTION_INBOUND, - BindAddress: &pbmesh.Listener_IpPort{ - IpPort: &pbmesh.IPPortAddress{ - Ip: "10.0.0.2", - Port: 20000, - }, - }, - }, - }, - } - - for name, c := range cases { - t.Run(name, func(t *testing.T) { - proxyStateTemplateID := testProxyStateTemplateID() +var ( + update = flag.Bool("update", false, "update the golden files of this test") +) - proxyStateTemplate := New(proxyStateTemplateID, testIdentityRef()).AddInboundListener(listenerName, c.workload).Build() - require.Len(t, proxyStateTemplate.ProxyState.Listeners, 1) - prototest.AssertDeepEqual(t, c.expListener, proxyStateTemplate.ProxyState.Listeners[0]) - }) - } +func TestMain(m *testing.M) { + flag.Parse() + os.Exit(m.Run()) } -func TestAddInboundRouters(t *testing.T) { - workload := testWorkload() - - // Create new builder - builder := New(testProxyStateTemplateID(), testIdentityRef()). - AddInboundListener("test-listener", workload). - AddInboundRouters(workload) - - clusterName := "local_app:port1" - expRouters := []*pbmesh.Router{ - { - Destination: &pbmesh.Router_L4{ - L4: &pbmesh.L4Destination{ - Name: clusterName, - StatPrefix: "test-listener", - }, - }, - }, +func protoToJSON(t *testing.T, pb proto.Message) string { + t.Helper() + m := protojson.MarshalOptions{ + Indent: " ", } - expCluster := &pbmesh.Cluster{ - Group: &pbmesh.Cluster_EndpointGroup{ - EndpointGroup: &pbmesh.EndpointGroup{ - Group: &pbmesh.EndpointGroup_Static{ - Static: &pbmesh.StaticEndpointGroup{ - Name: clusterName, - }, - }, - }, - }, - } - - expEndpoints := &pbmesh.Endpoints{ - Name: clusterName, - Endpoints: []*pbmesh.Endpoint{ - { - Address: &pbmesh.Endpoint_HostPort{ - HostPort: &pbmesh.HostPortAddress{ - Host: "127.0.0.1", - Port: 8080, - }, - }, - }, - }, - } - - proxyStateTemplate := builder.Build() - - // Check routers. - require.Len(t, proxyStateTemplate.ProxyState.Listeners, 1) - prototest.AssertDeepEqual(t, expRouters, proxyStateTemplate.ProxyState.Listeners[0].Routers) - - // Check that the cluster exists in the clusters map. - prototest.AssertDeepEqual(t, expCluster, proxyStateTemplate.ProxyState.Clusters[clusterName]) - - // Check that the endpoints exist in the endpoint map for this cluster name. - prototest.AssertDeepEqual(t, expEndpoints, proxyStateTemplate.ProxyState.Endpoints[clusterName]) + gotJSON, err := m.Marshal(pb) + require.NoError(t, err) + return string(gotJSON) } -func TestAddInboundTLS(t *testing.T) { - id := testProxyStateTemplateID() - workload := testWorkload() - - proxyStateTemplate := New(id, testIdentityRef()). - AddInboundListener("test-listener", workload). - AddInboundRouters(workload). - AddInboundTLS(). - Build() +func goldenValue(t *testing.T, goldenFile string, actual string, update bool) string { + t.Helper() + goldenPath := filepath.Join("testdata", goldenFile) + ".golden" - expTransportSocket := &pbmesh.TransportSocket{ - ConnectionTls: &pbmesh.TransportSocket_InboundMesh{ - InboundMesh: &pbmesh.InboundMeshMTLS{ - IdentityKey: workload.Identity, - ValidationContext: &pbmesh.MeshInboundValidationContext{ - TrustBundlePeerNameKeys: []string{id.Tenancy.PeerName}}, - }, - }, - } - expLeafCertRef := &pbmesh.LeafCertificateRef{ - Name: workload.Identity, - Namespace: id.Tenancy.Namespace, - Partition: id.Tenancy.Partition, - } + if update { + err := os.WriteFile(goldenPath, []byte(actual), 0644) + require.NoError(t, err) - require.Len(t, proxyStateTemplate.ProxyState.Listeners, 1) - // Check that each router has the same TLS configuration. - for _, router := range proxyStateTemplate.ProxyState.Listeners[0].Routers { - prototest.AssertDeepEqual(t, expTransportSocket, router.InboundTls) + return actual } - // Check that there's a leaf cert ref added to the map. - prototest.AssertDeepEqual(t, expLeafCertRef, proxyStateTemplate.RequiredLeafCertificates[workload.Identity]) - - // Check that there's trust bundle name added to the trust bundles names. - _, ok := proxyStateTemplate.RequiredTrustBundles[id.Tenancy.PeerName] - require.True(t, ok) -} - -func testProxyStateTemplateID() *pbresource.ID { - return resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() -} - -func testIdentityRef() *pbresource.Reference { - return &pbresource.Reference{ - Name: "test-identity", - Tenancy: &pbresource.Tenancy{ - Namespace: "default", - Partition: "default", - PeerName: "local", - }, - } -} - -func testWorkload() *pbcatalog.Workload { - return &pbcatalog.Workload{ - Identity: "test-identity", - Addresses: []*pbcatalog.WorkloadAddress{ - { - Host: "10.0.0.1", - }, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, - "port2": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - "port3": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, - }, - } + content, err := os.ReadFile(goldenPath) + require.NoError(t, err) + return string(content) } diff --git a/internal/mesh/internal/controllers/mesh/builder/destination_builder.go b/internal/mesh/internal/controllers/mesh/builder/destination_builder.go new file mode 100644 index 0000000000000..062ceb02f1eb0 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/destination_builder.go @@ -0,0 +1,132 @@ +package builder + +import ( + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func (b *Builder) BuildDestinations(destinations []*intermediate.Destination) *Builder { + for _, destination := range destinations { + if destination.Explicit != nil { + b.buildExplicitDestination(destination) + } + } + + return b +} + +func (b *Builder) buildExplicitDestination(destination *intermediate.Destination) *Builder { + clusterName := DestinationClusterName(destination.Explicit.DestinationRef, destination.Explicit.Datacenter, b.trustDomain) + statPrefix := DestinationStatPrefix(destination.Explicit.DestinationRef, destination.Explicit.Datacenter) + + // We assume that all endpoints have the same port. Later, we will change service endpoints to + // have the global ports map rather than per address. + destPort := destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports[destination.Explicit.DestinationPort] + + if destPort != nil { + return b.addOutboundDestinationListener(destination.Explicit). + addRouter(clusterName, statPrefix, destPort.Protocol). + addCluster(clusterName, destination.Identities). + addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, destination.Explicit.DestinationPort) + } + + return b +} + +func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *Builder { + listener := &pbproxystate.Listener{ + Direction: pbproxystate.Direction_DIRECTION_OUTBOUND, + } + + // Create outbound listener address. + switch explicit.ListenAddr.(type) { + case *pbmesh.Upstream_IpPort: + destinationAddr := explicit.ListenAddr.(*pbmesh.Upstream_IpPort) + listener.BindAddress = &pbproxystate.Listener_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: destinationAddr.IpPort.Ip, + Port: destinationAddr.IpPort.Port, + }, + } + listener.Name = DestinationListenerName(explicit.DestinationRef.Name, explicit.DestinationPort, destinationAddr.IpPort.Ip, destinationAddr.IpPort.Port) + case *pbmesh.Upstream_Unix: + destinationAddr := explicit.ListenAddr.(*pbmesh.Upstream_Unix) + listener.BindAddress = &pbproxystate.Listener_UnixSocket{ + UnixSocket: &pbproxystate.UnixSocketAddress{ + Path: destinationAddr.Unix.Path, + Mode: destinationAddr.Unix.Mode, + }, + } + listener.Name = DestinationListenerName(explicit.DestinationRef.Name, explicit.DestinationPort, destinationAddr.Unix.Path, 0) + } + + return b.addListener(listener) +} + +// for explicit destinations, we have no filter chain match, and filters based on port protocol +func (b *Builder) addRouter(clusterName, statPrefix string, protocol pbcatalog.Protocol) *Builder { + listener := b.getLastBuiltListener() + + switch protocol { + case pbcatalog.Protocol_PROTOCOL_TCP: + router := &pbproxystate.Router{ + Destination: &pbproxystate.Router_L4{ + L4: &pbproxystate.L4Destination{ + Name: clusterName, + StatPrefix: statPrefix, + }, + }, + } + listener.Routers = append(listener.Routers, router) + } + return b +} + +func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbresource.Reference) *Builder { + var spiffeIDs []string + for _, identity := range destinationIdentities { + spiffeIDs = append(spiffeIDs, connect.SpiffeIDFromIdentityRef(b.trustDomain, identity)) + } + + // Create destination cluster + cluster := &pbproxystate.Cluster{ + Group: &pbproxystate.Cluster_EndpointGroup{ + EndpointGroup: &pbproxystate.EndpointGroup{ + Group: &pbproxystate.EndpointGroup_Dynamic{ + Dynamic: &pbproxystate.DynamicEndpointGroup{ + Config: &pbproxystate.DynamicEndpointGroupConfig{ + DisablePanicThreshold: true, + }, + OutboundTls: &pbproxystate.TransportSocket{ + ConnectionTls: &pbproxystate.TransportSocket_OutboundMesh{ + OutboundMesh: &pbproxystate.OutboundMeshMTLS{ + IdentityKey: b.proxyStateTemplate.ProxyState.Identity.Name, + ValidationContext: &pbproxystate.MeshOutboundValidationContext{ + SpiffeIds: spiffeIDs, + }, + Sni: clusterName, + }, + }, + }, + }, + }, + }, + }, + } + + b.proxyStateTemplate.ProxyState.Clusters[clusterName] = cluster + return b +} + +func (b *Builder) addEndpointsRef(clusterName string, serviceEndpointsID *pbresource.ID, destinationPort string) *Builder { + // Finally, add endpoints references. + b.proxyStateTemplate.RequiredEndpoints[clusterName] = &pbproxystate.EndpointRef{ + Id: serviceEndpointsID, + Port: destinationPort, + } + return b +} diff --git a/internal/mesh/internal/controllers/mesh/builder/destination_builder_test.go b/internal/mesh/internal/controllers/mesh/builder/destination_builder_test.go new file mode 100644 index 0000000000000..6d8371662fd48 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/destination_builder_test.go @@ -0,0 +1,106 @@ +package builder + +import ( + "testing" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/stretchr/testify/require" +) + +var ( + endpointsData = &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1"}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + }, + }, + }, + } +) + +func TestBuildExplicitDestinations(t *testing.T) { + api1Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-1"). + WithData(t, endpointsData).Build() + + api2Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-2"). + WithData(t, endpointsData).Build() + + api1Identity := &pbresource.Reference{ + Name: "api1-identity", + Tenancy: api1Endpoints.Id.Tenancy, + } + + api2Identity := &pbresource.Reference{ + Name: "api2-identity", + Tenancy: api2Endpoints.Id.Tenancy, + } + + destinationIpPort := &intermediate.Destination{ + Explicit: &pbmesh.Upstream{ + DestinationRef: resource.Reference(api1Endpoints.Id, ""), + DestinationPort: "tcp", + Datacenter: "dc1", + ListenAddr: &pbmesh.Upstream_IpPort{ + IpPort: &pbmesh.IPPortAddress{Ip: "1.1.1.1", Port: 1234}, + }, + }, + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api1Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api1Identity}, + } + + destinationUnix := &intermediate.Destination{ + Explicit: &pbmesh.Upstream{ + DestinationRef: resource.Reference(api2Endpoints.Id, ""), + DestinationPort: "tcp", + Datacenter: "dc1", + ListenAddr: &pbmesh.Upstream_Unix{ + Unix: &pbmesh.UnixSocketAddress{Path: "/path/to/socket", Mode: "0666"}, + }, + }, + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api2Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api2Identity}, + } + + cases := map[string]struct { + destinations []*intermediate.Destination + }{ + "l4-single-destination-ip-port-bind-address": { + destinations: []*intermediate.Destination{destinationIpPort}, + }, + "l4-single-destination-unix-socket-bind-address": { + destinations: []*intermediate.Destination{destinationUnix}, + }, + "l4-multi-destination": { + destinations: []*intermediate.Destination{destinationIpPort, destinationUnix}, + }, + } + + for name, c := range cases { + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul"). + BuildDestinations(c.destinations). + Build() + + actual := protoToJSON(t, proxyTmpl) + expected := goldenValue(t, name, actual, *update) + + require.Equal(t, expected, actual) + } + +} diff --git a/internal/mesh/internal/controllers/mesh/builder/local_app.go b/internal/mesh/internal/controllers/mesh/builder/local_app.go new file mode 100644 index 0000000000000..5082ce298eed6 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/local_app.go @@ -0,0 +1,139 @@ +package builder + +import ( + "fmt" + + "github.com/hashicorp/consul/envoyextensions/xdscommon" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" +) + +func (b *Builder) BuildLocalApp(workload *pbcatalog.Workload) *Builder { + return b.addInboundListener(xdscommon.PublicListenerName, workload). + addInboundRouters(workload). + addInboundTLS() +} + +func (b *Builder) getLastBuiltListener() *pbproxystate.Listener { + lastBuiltIndex := len(b.proxyStateTemplate.ProxyState.Listeners) - 1 + return b.proxyStateTemplate.ProxyState.Listeners[lastBuiltIndex] +} + +func (b *Builder) addInboundListener(name string, workload *pbcatalog.Workload) *Builder { + listener := &pbproxystate.Listener{ + Name: name, + Direction: pbproxystate.Direction_DIRECTION_INBOUND, + } + + // We will take listener bind port from the workload for now. + // Find mesh port. + var meshPort string + for portName, port := range workload.Ports { + if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + meshPort = portName + break + } + } + + // Check if the workload has a specific address for the mesh port. + var meshAddress string + for _, address := range workload.Addresses { + for _, port := range address.Ports { + if port == meshPort { + meshAddress = address.Host + } + } + } + // Otherwise, assume the first address in the addresses list. + if meshAddress == "" { + // It is safe to assume that there's at least one address because we validate it when creating the workload. + meshAddress = workload.Addresses[0].Host + } + + listener.BindAddress = &pbproxystate.Listener_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: meshAddress, + Port: workload.Ports[meshPort].Port, + }, + } + + return b.addListener(listener) +} + +func (b *Builder) addInboundRouters(workload *pbcatalog.Workload) *Builder { + listener := b.getLastBuiltListener() + + // Go through workload ports and add the first non-mesh port we see. + // todo (ishustava): Note we will need to support multiple ports in the future. + // todo (ishustava): make sure we always iterate through ports in the same order so we don't need to send more updates to envoy. + for portName, port := range workload.Ports { + clusterName := fmt.Sprintf("%s:%s", xdscommon.LocalAppClusterName, portName) + if port.Protocol == pbcatalog.Protocol_PROTOCOL_TCP { + r := &pbproxystate.Router{ + Destination: &pbproxystate.Router_L4{ + L4: &pbproxystate.L4Destination{ + Name: clusterName, + StatPrefix: listener.Name, + }, + }, + } + listener.Routers = append(listener.Routers, r) + + // Make cluster for this router destination. + b.proxyStateTemplate.ProxyState.Clusters[clusterName] = &pbproxystate.Cluster{ + Group: &pbproxystate.Cluster_EndpointGroup{ + EndpointGroup: &pbproxystate.EndpointGroup{ + Group: &pbproxystate.EndpointGroup_Static{ + Static: &pbproxystate.StaticEndpointGroup{}, + }, + }, + }, + } + + // Finally, add static endpoints. We're adding it statically as opposed to creating an endpoint ref + // because this endpoint is less likely to change as we're not tracking the health. + endpoint := &pbproxystate.Endpoint{ + Address: &pbproxystate.Endpoint_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "127.0.0.1", + Port: port.Port, + }, + }, + } + b.proxyStateTemplate.ProxyState.Endpoints[clusterName] = &pbproxystate.Endpoints{ + Endpoints: []*pbproxystate.Endpoint{endpoint}, + } + break + } + } + return b +} + +func (b *Builder) addInboundTLS() *Builder { + listener := b.getLastBuiltListener() + // For inbound TLS, we want to use this proxy's identity. + workloadIdentity := b.proxyStateTemplate.ProxyState.Identity.Name + + inboundTLS := &pbproxystate.TransportSocket{ + ConnectionTls: &pbproxystate.TransportSocket_InboundMesh{ + InboundMesh: &pbproxystate.InboundMeshMTLS{ + IdentityKey: workloadIdentity, + ValidationContext: &pbproxystate.MeshInboundValidationContext{TrustBundlePeerNameKeys: []string{b.id.Tenancy.PeerName}}, + }, + }, + } + b.proxyStateTemplate.RequiredLeafCertificates[workloadIdentity] = &pbproxystate.LeafCertificateRef{ + Name: workloadIdentity, + Namespace: b.id.Tenancy.Namespace, + Partition: b.id.Tenancy.Partition, + } + + b.proxyStateTemplate.RequiredTrustBundles[b.id.Tenancy.PeerName] = &pbproxystate.TrustBundleRef{ + Peer: b.id.Tenancy.PeerName, + } + + for i := range listener.Routers { + listener.Routers[i].InboundTls = inboundTLS + } + return b +} diff --git a/internal/mesh/internal/controllers/mesh/builder/local_app_test.go b/internal/mesh/internal/controllers/mesh/builder/local_app_test.go new file mode 100644 index 0000000000000..5a71e282a66d3 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/local_app_test.go @@ -0,0 +1,91 @@ +package builder + +import ( + "testing" + + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/stretchr/testify/require" +) + +func TestBuildLocalApp(t *testing.T) { + cases := map[string]struct { + workload *pbcatalog.Workload + }{ + "l4-single-workload-address-without-ports": { + workload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.0.0.1", + }, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + }, + }, + "l4-multiple-workload-addresses-without-ports": { + workload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.0.0.1", + }, + { + Host: "10.0.0.2", + }, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + }, + }, + "l4-multiple-workload-addresses-with-specific-ports": { + workload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "127.0.0.1", + Ports: []string{"port1"}, + }, + { + Host: "10.0.0.2", + Ports: []string{"port2"}, + }, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul").BuildLocalApp(c.workload). + Build() + actual := protoToJSON(t, proxyTmpl) + expected := goldenValue(t, name, actual, *update) + + require.Equal(t, expected, actual) + }) + } +} + +func testProxyStateTemplateID() *pbresource.ID { + return resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() +} + +func testIdentityRef() *pbresource.Reference { + return &pbresource.Reference{ + Name: "test-identity", + Tenancy: &pbresource.Tenancy{ + Namespace: "default", + Partition: "default", + PeerName: "local", + }, + } +} diff --git a/internal/mesh/internal/controllers/mesh/builder/naming.go b/internal/mesh/internal/controllers/mesh/builder/naming.go new file mode 100644 index 0000000000000..0186dc8c0b2ab --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/naming.go @@ -0,0 +1,33 @@ +package builder + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func DestinationClusterName(serviceRef *pbresource.Reference, datacenter, trustDomain string) string { + return connect.ServiceSNI(serviceRef.Name, + "", + serviceRef.Tenancy.Namespace, + serviceRef.Tenancy.Partition, + datacenter, + trustDomain) +} + +func DestinationStatPrefix(serviceRef *pbresource.Reference, datacenter string) string { + return fmt.Sprintf("upstream.%s.%s.%s.%s", + serviceRef.Name, + serviceRef.Tenancy.Namespace, + serviceRef.Tenancy.Partition, + datacenter) +} + +func DestinationListenerName(name, portName string, address string, port uint32) string { + if port != 0 { + return fmt.Sprintf("%s:%s:%s:%d", name, portName, address, port) + } + + return fmt.Sprintf("%s:%s:%s", name, portName, address) +} diff --git a/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multi-destination.golden b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multi-destination.golden new file mode 100644 index 0000000000000..dae364f5c07dc --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multi-destination.golden @@ -0,0 +1,122 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "api-1:tcp:1.1.1.1:1234", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 + }, + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + } + ] + }, + { + "name": "api-2:tcp:/path/to/socket", + "direction": "DIRECTION_OUTBOUND", + "unixSocket": { + "path": "/path/to/socket", + "mode": "0666" + }, + "routers": [ + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + } + } + ] + } + ], + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + }, + "sni": "api-1.default.dc1.internal.foo.consul" + } + } + } + } + }, + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + }, + "sni": "api-2.default.dc1.internal.foo.consul" + } + } + } + } + } + } + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "tcp" + }, + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "tcp" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multiple-workload-addresses-with-specific-ports.golden b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multiple-workload-addresses-with-specific-ports.golden new file mode 100644 index 0000000000000..ff3d8ef0c0726 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multiple-workload-addresses-with-specific-ports.golden @@ -0,0 +1,71 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "public_listener", + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.2", + "port": 20000 + }, + "routers": [ + { + "l4": { + "name": "local_app:port1", + "statPrefix": "public_listener" + }, + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ + "local" + ] + } + } + } + } + ] + } + ], + "clusters": { + "local_app:port1": { + "endpointGroup": { + "static": {} + } + } + }, + "endpoints": { + "local_app:port1": { + "endpoints": [ + { + "hostPort": { + "host": "127.0.0.1", + "port": 8080 + } + } + ] + } + } + }, + "requiredLeafCertificates": { + "test-identity": { + "name": "test-identity", + "namespace": "default", + "partition": "default" + } + }, + "requiredTrustBundles": { + "local": { + "peer": "local" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multiple-workload-addresses-without-ports.golden b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multiple-workload-addresses-without-ports.golden new file mode 100644 index 0000000000000..9c22e94d5974d --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-multiple-workload-addresses-without-ports.golden @@ -0,0 +1,71 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "public_listener", + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.1", + "port": 20000 + }, + "routers": [ + { + "l4": { + "name": "local_app:port1", + "statPrefix": "public_listener" + }, + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ + "local" + ] + } + } + } + } + ] + } + ], + "clusters": { + "local_app:port1": { + "endpointGroup": { + "static": {} + } + } + }, + "endpoints": { + "local_app:port1": { + "endpoints": [ + { + "hostPort": { + "host": "127.0.0.1", + "port": 8080 + } + } + ] + } + } + }, + "requiredLeafCertificates": { + "test-identity": { + "name": "test-identity", + "namespace": "default", + "partition": "default" + } + }, + "requiredTrustBundles": { + "local": { + "peer": "local" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-destination-ip-port-bind-address.golden b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-destination-ip-port-bind-address.golden new file mode 100644 index 0000000000000..44c97ca76f5c6 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-destination-ip-port-bind-address.golden @@ -0,0 +1,70 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "api-1:tcp:1.1.1.1:1234", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 + }, + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + } + ] + } + ], + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + }, + "sni": "api-1.default.dc1.internal.foo.consul" + } + } + } + } + } + } + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "tcp" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-destination-unix-socket-bind-address.golden b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-destination-unix-socket-bind-address.golden new file mode 100644 index 0000000000000..2dbaa61a1ff3a --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-destination-unix-socket-bind-address.golden @@ -0,0 +1,70 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "api-2:tcp:/path/to/socket", + "direction": "DIRECTION_OUTBOUND", + "unixSocket": { + "path": "/path/to/socket", + "mode": "0666" + }, + "routers": [ + { + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + } + } + ] + } + ], + "clusters": { + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + }, + "sni": "api-2.default.dc1.internal.foo.consul" + } + } + } + } + } + } + }, + "requiredEndpoints": { + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "tcp" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-workload-address-without-ports.golden b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-workload-address-without-ports.golden new file mode 100644 index 0000000000000..9c22e94d5974d --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/builder/testdata/l4-single-workload-address-without-ports.golden @@ -0,0 +1,71 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "public_listener", + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.1", + "port": 20000 + }, + "routers": [ + { + "l4": { + "name": "local_app:port1", + "statPrefix": "public_listener" + }, + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ + "local" + ] + } + } + } + } + ] + } + ], + "clusters": { + "local_app:port1": { + "endpointGroup": { + "static": {} + } + } + }, + "endpoints": { + "local_app:port1": { + "endpoints": [ + { + "hostPort": { + "host": "127.0.0.1", + "port": 8080 + } + } + ] + } + } + }, + "requiredLeafCertificates": { + "test-identity": { + "name": "test-identity", + "namespace": "default", + "partition": "default" + } + }, + "requiredTrustBundles": { + "local": { + "peer": "local" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/mesh/cache/cache.go b/internal/mesh/internal/controllers/mesh/cache/cache.go new file mode 100644 index 0000000000000..5e8dd7d8b12d4 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/cache/cache.go @@ -0,0 +1,144 @@ +package cache + +import ( + "fmt" + "sync" + + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// Cache stores information needed for the mesh controller to reconcile efficiently. +// This currently means storing a list of all destinations for easy look up +// as well as indices of source proxies where those destinations are referenced. +// +// It is the responsibility of controller and its subcomponents (like mapper and data fetcher) +// to keep this cache up-to-date as we're observing new data. +type Cache struct { + lock sync.RWMutex + + // store is a map from destination service reference and port as a string ID + // to the object representing destination reference. + store map[string]*intermediate.CombinedDestinationRef + + // sourceProxiesIndex stores a map from a string representation of source proxy ID + // to the keys in the store map. + sourceProxiesIndex map[string]storeKeys +} + +type storeKeys map[string]struct{} + +func New() *Cache { + return &Cache{ + store: make(map[string]*intermediate.CombinedDestinationRef), + sourceProxiesIndex: make(map[string]storeKeys), + } +} + +func KeyFromID(id *pbresource.ID) string { + return fmt.Sprintf("%s/%s/%s", + resource.ToGVK(id.Type), + resource.TenancyToString(id.Tenancy), + id.Name) +} + +func KeyFromRefAndPort(ref *pbresource.Reference, port string) string { + return fmt.Sprintf("%s:%s", + resource.ReferenceToString(ref), + port) +} + +func (c *Cache) Write(d *intermediate.CombinedDestinationRef) { + c.lock.Lock() + defer c.lock.Unlock() + + key := KeyFromRefAndPort(d.ServiceRef, d.Port) + + c.store[key] = d + + // Update source proxies index. + for _, proxyID := range d.SourceProxies { + proxyIDKey := KeyFromID(proxyID) + + _, ok := c.sourceProxiesIndex[proxyIDKey] + if !ok { + c.sourceProxiesIndex[proxyIDKey] = make(storeKeys) + } + + c.sourceProxiesIndex[proxyIDKey][key] = struct{}{} + } +} + +func (c *Cache) Delete(ref *pbresource.Reference, port string) { + c.lock.Lock() + defer c.lock.Unlock() + + key := KeyFromRefAndPort(ref, port) + + // First get it from the store. + dest, ok := c.store[key] + if !ok { + // If it's not there, return as there's nothing for us to. + return + } + + // Update source proxies indices. + for _, proxyID := range dest.SourceProxies { + proxyIDKey := KeyFromID(proxyID) + + // Delete our destination key from this source proxy. + delete(c.sourceProxiesIndex[proxyIDKey], key) + } + + // Finally, delete this destination from the store. + delete(c.store, key) +} + +func (c *Cache) DeleteSourceProxy(id *pbresource.ID) { + c.lock.Lock() + defer c.lock.Unlock() + + proxyIDKey := KeyFromID(id) + + // Get all destination keys. + destKeys := c.sourceProxiesIndex[proxyIDKey] + + for destKey := range destKeys { + // Read destination. + dest, ok := c.store[destKey] + if !ok { + // If there's no destination with that key, skip it as there's nothing for us to do. + continue + } + + // Delete the source proxy ID. + delete(dest.SourceProxies, proxyIDKey) + } + + // Finally, delete the index for this proxy. + delete(c.sourceProxiesIndex, proxyIDKey) +} + +func (c *Cache) ReadDestination(ref *pbresource.Reference, port string) *intermediate.CombinedDestinationRef { + c.lock.RLock() + defer c.lock.RUnlock() + + key := KeyFromRefAndPort(ref, port) + return c.store[key] +} + +func (c *Cache) DestinationsBySourceProxy(id *pbresource.ID) []*intermediate.CombinedDestinationRef { + c.lock.RLock() + defer c.lock.RUnlock() + + var destinations []*intermediate.CombinedDestinationRef + + proxyIDKey := KeyFromID(id) + + for destKey := range c.sourceProxiesIndex[proxyIDKey] { + destinations = append(destinations, c.store[destKey]) + } + + return destinations +} diff --git a/internal/mesh/internal/controllers/mesh/cache/cache_test.go b/internal/mesh/internal/controllers/mesh/cache/cache_test.go new file mode 100644 index 0000000000000..7f0dd4b437d44 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/cache/cache_test.go @@ -0,0 +1,168 @@ +package cache + +import ( + "testing" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource/resourcetest" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/stretchr/testify/require" +) + +func TestWrite_Create(t *testing.T) { + cache := New() + + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + destination := testDestination(proxyID) + cache.Write(destination) + + destKey := KeyFromRefAndPort(destination.ServiceRef, destination.Port) + require.Equal(t, destination, cache.store[destKey]) + actualSourceProxies := cache.sourceProxiesIndex + expectedSourceProxies := map[string]storeKeys{ + KeyFromID(proxyID): {destKey: struct{}{}}, + } + require.Equal(t, expectedSourceProxies, actualSourceProxies) + + // Check that we can read back the destination successfully. + require.Equal(t, destination, cache.ReadDestination(destination.ServiceRef, destination.Port)) +} + +func TestWrite_Update(t *testing.T) { + cache := New() + + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + destination1 := testDestination(proxyID) + cache.Write(destination1) + + // Add another destination for the same proxy ID. + destination2 := testDestination(proxyID) + destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection() + cache.Write(destination2) + + // Check that the source proxies are updated. + actualSourceProxies := cache.sourceProxiesIndex + expectedSourceProxies := map[string]storeKeys{ + KeyFromID(proxyID): { + KeyFromRefAndPort(destination1.ServiceRef, destination1.Port): struct{}{}, + KeyFromRefAndPort(destination2.ServiceRef, destination2.Port): struct{}{}, + }, + } + require.Equal(t, expectedSourceProxies, actualSourceProxies) + + // Add another destination for a different proxy. + anotherProxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-def").ID() + destination3 := testDestination(anotherProxyID) + destination3.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-3").ReferenceNoSection() + cache.Write(destination3) + + actualSourceProxies = cache.sourceProxiesIndex + expectedSourceProxies = map[string]storeKeys{ + KeyFromID(proxyID): { + KeyFromRefAndPort(destination1.ServiceRef, destination1.Port): struct{}{}, + KeyFromRefAndPort(destination2.ServiceRef, destination2.Port): struct{}{}, + }, + KeyFromID(anotherProxyID): { + KeyFromRefAndPort(destination3.ServiceRef, destination3.Port): struct{}{}, + }, + } + require.Equal(t, expectedSourceProxies, actualSourceProxies) +} + +func TestWrite_Delete(t *testing.T) { + cache := New() + + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + destination1 := testDestination(proxyID) + cache.Write(destination1) + + // Add another destination for the same proxy ID. + destination2 := testDestination(proxyID) + destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection() + cache.Write(destination2) + + cache.Delete(destination1.ServiceRef, destination1.Port) + + require.NotContains(t, cache.store, KeyFromRefAndPort(destination1.ServiceRef, destination1.Port)) + + // Check that the source proxies are updated. + actualSourceProxies := cache.sourceProxiesIndex + expectedSourceProxies := map[string]storeKeys{ + KeyFromID(proxyID): { + KeyFromRefAndPort(destination2.ServiceRef, destination2.Port): struct{}{}, + }, + } + require.Equal(t, expectedSourceProxies, actualSourceProxies) + + // Try to delete non-existing destination and check that nothing has changed.. + cache.Delete( + resourcetest.Resource(catalog.ServiceType, "does-not-exist").ReferenceNoSection(), + "doesn't-matter") + + require.Contains(t, cache.store, KeyFromRefAndPort(destination2.ServiceRef, destination2.Port)) + require.Equal(t, expectedSourceProxies, cache.sourceProxiesIndex) +} + +func TestDeleteSourceProxy(t *testing.T) { + cache := New() + + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + destination1 := testDestination(proxyID) + cache.Write(destination1) + + // Add another destination for the same proxy ID. + destination2 := testDestination(proxyID) + destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection() + cache.Write(destination2) + + cache.DeleteSourceProxy(proxyID) + + // Check that source proxy index is gone. + proxyKey := KeyFromID(proxyID) + require.NotContains(t, cache.sourceProxiesIndex, proxyKey) + + // Check that the destinations no longer have this proxy as the source. + require.NotContains(t, destination1.SourceProxies, proxyKey) + require.NotContains(t, destination2.SourceProxies, proxyKey) + + // Try to add a non-existent key to source proxy index + cache.sourceProxiesIndex[proxyKey] = map[string]struct{}{"doesn't-exist": {}} + cache.DeleteSourceProxy(proxyID) + + // Check that source proxy index is gone. + require.NotContains(t, cache.sourceProxiesIndex, proxyKey) + + // Check that the destinations no longer have this proxy as the source. + require.NotContains(t, destination1.SourceProxies, proxyKey) + require.NotContains(t, destination2.SourceProxies, proxyKey) +} + +func TestDestinationsBySourceProxy(t *testing.T) { + cache := New() + + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + destination1 := testDestination(proxyID) + cache.Write(destination1) + + // Add another destination for the same proxy ID. + destination2 := testDestination(proxyID) + destination2.ServiceRef = resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection() + cache.Write(destination2) + + actualDestinations := cache.DestinationsBySourceProxy(proxyID) + expectedDestinations := []*intermediate.CombinedDestinationRef{destination1, destination2} + require.ElementsMatch(t, expectedDestinations, actualDestinations) +} + +func testDestination(proxyID *pbresource.ID) *intermediate.CombinedDestinationRef { + return &intermediate.CombinedDestinationRef{ + ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service").ReferenceNoSection(), + Port: "tcp", + ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "test-servicedestinations").ID(), + SourceProxies: map[string]*pbresource.ID{ + KeyFromID(proxyID): proxyID, + }, + } +} diff --git a/internal/mesh/internal/controllers/mesh/controller.go b/internal/mesh/internal/controllers/mesh/controller.go index 84a7b6436b7cf..68e35b221b1de 100644 --- a/internal/mesh/internal/controllers/mesh/controller.go +++ b/internal/mesh/internal/controllers/mesh/controller.go @@ -6,18 +6,16 @@ package mesh import ( "context" - "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/builder" - "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/mappers" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/fetcher" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/mapper" "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" "github.com/hashicorp/consul/internal/resource" - pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" - pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) @@ -25,100 +23,114 @@ import ( // ControllerName is the name for this controller. It's used for logging or status keys. const ControllerName = "consul.io/mesh-controller" -func Controller() controller.Controller { +type TrustDomainFetcher func() (string, error) + +func Controller(cache *cache.Cache, mapper *mapper.Mapper, trustDomainFetcher TrustDomainFetcher) controller.Controller { + if cache == nil || mapper == nil || trustDomainFetcher == nil { + panic("cache, mapper and trust domain fetcher are required") + } + return controller.ForType(types.ProxyStateTemplateType). - WithWatch(catalog.ServiceEndpointsType, mappers.MapServiceEndpointsToProxyStateTemplate). - WithReconciler(&reconciler{}) + WithWatch(catalog.ServiceEndpointsType, mapper.MapServiceEndpointsToProxyStateTemplate). + WithWatch(types.UpstreamsType, mapper.MapDestinationsToProxyStateTemplate). + WithReconciler(&reconciler{cache: cache, getTrustDomain: trustDomainFetcher}) } -type reconciler struct{} +type reconciler struct { + cache *cache.Cache + getTrustDomain TrustDomainFetcher +} func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error { rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", ControllerName) rt.Logger.Trace("reconciling proxy state template", "id", req.ID) - // Check if the workload exists. - workloadID := workloadIDFromProxyStateTemplate(req.ID) - rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: workloadID}) + // Instantiate a data fetcher to fetch all reconciliation data. + dataFetcher := fetcher.Fetcher{Client: rt.Client, Cache: r.cache} - switch { - case status.Code(err) == codes.NotFound: - // If workload has been deleted, then return as ProxyStateTemplate should be cleaned up + // Check if the apiWorkload exists. + workloadID := resource.ReplaceType(catalog.WorkloadType, req.ID) + workload, err := dataFetcher.FetchWorkload(ctx, resource.ReplaceType(catalog.WorkloadType, req.ID)) + if err != nil { + rt.Logger.Error("error reading the associated apiWorkload", "error", err) + return err + } + if workload == nil { + // If apiWorkload has been deleted, then return as ProxyStateTemplate should be cleaned up // by the garbage collector because of the owner reference. - rt.Logger.Trace("workload doesn't exist; skipping reconciliation", "workload", workloadID) + rt.Logger.Trace("apiWorkload doesn't exist; skipping reconciliation", "apiWorkload", workloadID) return nil - case err != nil: - rt.Logger.Error("error reading the associated workload", "error", err) - return err } - // Parse the workload data for this proxy. Note that we know that this workload has a service associated with it - // because we only trigger updates off of service endpoints. - workloadRes := rsp.Resource - var workload pbcatalog.Workload - err = workloadRes.Data.UnmarshalTo(&workload) + proxyStateTemplate, err := dataFetcher.FetchProxyStateTemplate(ctx, req.ID) if err != nil { - rt.Logger.Error("error parsing workload data", "workload", workloadRes.Id) - return resource.NewErrDataParse(&workload, err) - } - - rsp, err = rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID}) - var buildNew bool - switch { - case status.Code(err) == codes.NotFound: - // Nothing to do as this resource may not have been created yet. - rt.Logger.Trace("proxy state template for this workload doesn't yet exist; generating a new one", "id", req.ID) - buildNew = true - case err != nil: rt.Logger.Error("error reading proxy state template", "error", err) return nil } - if !isMeshEnabled(workload.Ports) { + if proxyStateTemplate == nil { + // If proxy state template has been deleted + rt.Logger.Trace("proxy state template for this apiWorkload doesn't yet exist; generating a new one", "id", req.ID) + } + + if !fetcher.IsMeshEnabled(workload.Workload.Ports) { // Skip non-mesh workloads. // If there's existing proxy state template, delete it. - if !buildNew { - rt.Logger.Trace("deleting existing proxy state template because workload is no longer on the mesh", "id", req.ID) + if proxyStateTemplate != nil { + rt.Logger.Trace("deleting existing proxy state template because apiWorkload is no longer on the mesh", "id", req.ID) _, err = rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: req.ID}) if err != nil { rt.Logger.Error("error deleting existing proxy state template", "error", err) return err } + + // Remove it from cache. + r.cache.DeleteSourceProxy(req.ID) } - rt.Logger.Trace("skipping proxy state template generation because workload is not on the mesh", "workload", workloadRes.Id) + rt.Logger.Trace("skipping proxy state template generation because apiWorkload is not on the mesh", "apiWorkload", workload.Resource.Id) return nil } - var proxyTemplate pbmesh.ProxyStateTemplate - if !buildNew { - err = rsp.Resource.Data.UnmarshalTo(&proxyTemplate) - if err != nil { - rt.Logger.Error("error parsing proxy state template data", "id", req.ID) - return resource.NewErrDataParse(&proxyTemplate, err) - } + // First get the trust domain. + trustDomain, err := r.getTrustDomain() + if err != nil { + return err + } + + b := builder.New(req.ID, workloadIdentityRefFromWorkload(workload), trustDomain). + BuildLocalApp(workload.Workload) + + // Get all destinationsData. + destinationsRefs := r.cache.DestinationsBySourceProxy(req.ID) + destinationsData, statuses, err := dataFetcher.FetchDestinationsData(ctx, destinationsRefs) + if err != nil { + return err } - b := builder.New(req.ID, workloadIdentityRefFromWorkload(workloadRes.Id)). - AddInboundListener(xdscommon.PublicListenerName, &workload). - AddInboundRouters(&workload). - AddInboundTLS() + b.BuildDestinations(destinationsData) newProxyTemplate := b.Build() - same := proto.Equal(&proxyTemplate, newProxyTemplate) - if buildNew || !same { + var equal bool + if proxyStateTemplate != nil { + equal = proto.Equal(proxyStateTemplate.Tmpl, newProxyTemplate) + } + if proxyStateTemplate == nil || !equal { proxyTemplateData, err := anypb.New(newProxyTemplate) if err != nil { rt.Logger.Error("error creating proxy state template data", "error", err) return err } rt.Logger.Trace("updating proxy state template", "id", req.ID) + if proxyStateTemplate != nil { + rt.Logger.Trace("updatating proxy state template version", "version", proxyStateTemplate.Resource.Version) + } _, err = rt.Client.Write(ctx, &pbresource.WriteRequest{ Resource: &pbresource.Resource{ Id: req.ID, - Owner: workloadRes.Id, + Owner: workload.Resource.Id, Data: proxyTemplateData, }, }) @@ -129,31 +141,31 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c } else { rt.Logger.Trace("proxy state template data has not changed, skipping update", "id", req.ID) } - return nil -} -func workloadIDFromProxyStateTemplate(id *pbresource.ID) *pbresource.ID { - return &pbresource.ID{ - Name: id.Name, - Tenancy: id.Tenancy, - Type: catalog.WorkloadType, + // Update any statuses. + for _, status := range statuses { + updatedStatus := &pbresource.Status{ + ObservedGeneration: status.Generation, + } + updatedStatus.Conditions = status.Conditions + // If the status is unchanged then we should return and avoid the unnecessary write + if !resource.EqualStatus(status.OldStatus[ControllerName], updatedStatus, false) { + _, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{ + Id: status.ID, + Key: ControllerName, + Status: updatedStatus, + }) + if err != nil { + return err + } + } } + return nil } -func workloadIdentityRefFromWorkload(id *pbresource.ID) *pbresource.Reference { +func workloadIdentityRefFromWorkload(w *intermediate.Workload) *pbresource.Reference { return &pbresource.Reference{ - Name: id.Name, - Tenancy: id.Tenancy, - } -} - -// isMeshEnabled returns true if workload or service endpoints port -// contain a port with the "mesh" protocol. -func isMeshEnabled(ports map[string]*pbcatalog.WorkloadPort) bool { - for _, port := range ports { - if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { - return true - } + Name: w.Workload.Identity, + Tenancy: w.Resource.Id.Tenancy, } - return false } diff --git a/internal/mesh/internal/controllers/mesh/controller_test.go b/internal/mesh/internal/controllers/mesh/controller_test.go index cd5f82e3fbc93..b32aaf47c8a31 100644 --- a/internal/mesh/internal/controllers/mesh/controller_test.go +++ b/internal/mesh/internal/controllers/mesh/controller_test.go @@ -8,14 +8,18 @@ import ( "testing" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" - "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/builder" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/mapper" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/status" "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/resourcetest" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/prototest" "github.com/hashicorp/consul/sdk/testutil" @@ -30,11 +34,12 @@ type meshControllerTestSuite struct { client *resourcetest.Client runtime controller.Runtime - ctl reconciler + ctl *reconciler ctx context.Context - workloadID *pbresource.ID - workload *pbcatalog.Workload + apiWorkloadID *pbresource.ID + apiWorkload *pbcatalog.Workload + webWorkload *pbcatalog.Workload proxyStateTemplate *pbmesh.ProxyStateTemplate } @@ -44,8 +49,15 @@ func (suite *meshControllerTestSuite) SetupTest() { suite.runtime = controller.Runtime{Client: resourceClient, Logger: testutil.Logger(suite.T())} suite.ctx = testutil.TestContext(suite.T()) - suite.workload = &pbcatalog.Workload{ - Identity: "test-identity", + suite.ctl = &reconciler{ + cache: cache.New(), + getTrustDomain: func() (string, error) { + return "test.consul", nil + }, + } + + suite.apiWorkload = &pbcatalog.Workload{ + Identity: "api-identity", Addresses: []*pbcatalog.WorkloadAddress{ { Host: "10.0.0.1", @@ -57,19 +69,30 @@ func (suite *meshControllerTestSuite) SetupTest() { }, } - suite.workloadID = resourcetest.Resource(catalog.WorkloadType, "test-workload"). - WithData(suite.T(), suite.workload). + suite.apiWorkloadID = resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), suite.apiWorkload). Write(suite.T(), resourceClient).Id + suite.webWorkload = &pbcatalog.Workload{ + Identity: "web-identity", + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "10.0.0.2", + }, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "mesh": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + } + identityRef := &pbresource.Reference{ - Name: suite.workload.Identity, - Tenancy: suite.workloadID.Tenancy, + Name: suite.apiWorkload.Identity, + Tenancy: suite.apiWorkloadID.Tenancy, } - suite.proxyStateTemplate = builder.New(suite.workloadID, identityRef). - AddInboundListener(xdscommon.PublicListenerName, suite.workload). - AddInboundRouters(suite.workload). - AddInboundTLS(). + suite.proxyStateTemplate = builder.New(suite.apiWorkloadID, identityRef, "test.consul"). + BuildLocalApp(suite.apiWorkload). Build() } @@ -96,41 +119,41 @@ func (suite *meshControllerTestSuite) TestReconcile_NonMeshWorkload() { }, } - resourcetest.Resource(catalog.WorkloadType, "test-non-mesh-workload"). + resourcetest.Resource(catalog.WorkloadType, "test-non-mesh-apiWorkload"). WithData(suite.T(), nonMeshWorkload). Write(suite.T(), suite.client.ResourceServiceClient) err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ - ID: resourceID(types.ProxyStateTemplateType, "test-non-mesh-workload"), + ID: resourceID(types.ProxyStateTemplateType, "test-non-mesh-apiWorkload"), }) require.NoError(suite.T(), err) - suite.client.RequireResourceNotFound(suite.T(), resourceID(types.ProxyStateTemplateType, "test-non-mesh-workload")) + suite.client.RequireResourceNotFound(suite.T(), resourceID(types.ProxyStateTemplateType, "test-non-mesh-apiWorkload")) } func (suite *meshControllerTestSuite) TestReconcile_NoExistingProxyStateTemplate() { err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ - ID: resourceID(types.ProxyStateTemplateType, suite.workloadID.Name), + ID: resourceID(types.ProxyStateTemplateType, suite.apiWorkloadID.Name), }) require.NoError(suite.T(), err) - res := suite.client.RequireResourceExists(suite.T(), resourceID(types.ProxyStateTemplateType, suite.workloadID.Name)) + res := suite.client.RequireResourceExists(suite.T(), resourceID(types.ProxyStateTemplateType, suite.apiWorkloadID.Name)) require.NoError(suite.T(), err) require.NotNil(suite.T(), res.Data) - prototest.AssertDeepEqual(suite.T(), suite.workloadID, res.Owner) + prototest.AssertDeepEqual(suite.T(), suite.apiWorkloadID, res.Owner) } func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_WithUpdates() { // Write the original. - resourcetest.Resource(types.ProxyStateTemplateType, "test-workload"). + resourcetest.Resource(types.ProxyStateTemplateType, "api-abc"). WithData(suite.T(), suite.proxyStateTemplate). - WithOwner(suite.workloadID). + WithOwner(suite.apiWorkloadID). Write(suite.T(), suite.client.ResourceServiceClient) - // Update the workload. - suite.workload.Ports["mesh"].Port = 21000 - updatedWorkloadID := resourcetest.Resource(catalog.WorkloadType, "test-workload"). - WithData(suite.T(), suite.workload). + // Update the apiWorkload. + suite.apiWorkload.Ports["mesh"].Port = 21000 + updatedWorkloadID := resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), suite.apiWorkload). Write(suite.T(), suite.client.ResourceServiceClient).Id err := suite.ctl.Reconcile(context.Background(), suite.runtime, controller.Request{ @@ -148,20 +171,21 @@ func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_W require.NoError(suite.T(), err) // Check that our value is updated. - inboundListenerPort := updatedProxyStateTemplate.ProxyState.Listeners[0].BindAddress.(*pbmesh.Listener_IpPort).IpPort.Port + inboundListenerPort := updatedProxyStateTemplate.ProxyState.Listeners[0]. + BindAddress.(*pbproxystate.Listener_HostPort).HostPort.Port require.Equal(suite.T(), uint32(21000), inboundListenerPort) } func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_NoUpdates() { - // Write the original - originalProxyState := resourcetest.Resource(types.ProxyStateTemplateType, "test-workload"). + // Write the original. + originalProxyState := resourcetest.Resource(types.ProxyStateTemplateType, "api-abc"). WithData(suite.T(), suite.proxyStateTemplate). - WithOwner(suite.workloadID). + WithOwner(suite.apiWorkloadID). Write(suite.T(), suite.client.ResourceServiceClient) - // Update the metadata on the workload which should result in no changes. - updatedWorkloadID := resourcetest.Resource(catalog.WorkloadType, "test-workload"). - WithData(suite.T(), suite.workload). + // Update the metadata on the apiWorkload which should result in no changes. + updatedWorkloadID := resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), suite.apiWorkload). WithMeta("some", "meta"). Write(suite.T(), suite.client.ResourceServiceClient).Id @@ -170,71 +194,143 @@ func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_N }) require.NoError(suite.T(), err) - updatedProxyState := suite.client.RequireResourceExists(suite.T(), resourceID(types.ProxyStateTemplateType, suite.workloadID.Name)) + updatedProxyState := suite.client.RequireResourceExists(suite.T(), resourceID(types.ProxyStateTemplateType, suite.apiWorkloadID.Name)) resourcetest.RequireVersionUnchanged(suite.T(), updatedProxyState, originalProxyState.Version) } -// delete the workload, check that proxy state gets deleted (?can we check that?) func (suite *meshControllerTestSuite) TestController() { // Run the controller manager mgr := controller.NewManager(suite.client, suite.runtime.Logger) - mgr.Register(Controller()) + c := cache.New() + m := &mapper.Mapper{Cache: c} + + mgr.Register(Controller(c, m, func() (string, error) { + return "test.consul", nil + })) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) - proxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "test-workload").ID() - // Add a mesh workload and check that it gets reconciled. - resourcetest.Resource(catalog.WorkloadType, "test-workload"). - WithData(suite.T(), suite.workload). + apiProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "api-abc").ID() + // Add a mesh apiWorkload and check that it gets reconciled. + resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), suite.apiWorkload). Write(suite.T(), suite.client.ResourceServiceClient) - resourcetest.Resource(catalog.ServiceType, "test-service"). + apiService := resourcetest.Resource(catalog.ServiceType, "api-service"). WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{Names: []string{"test-workload"}}, + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"api-abc"}}, Ports: []*pbcatalog.ServicePort{ - {TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + {TargetPort: "tcp", Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, }}). Write(suite.T(), suite.client.ResourceServiceClient) - endpoints := &pbcatalog.ServiceEndpoints{ + apiEndpoints := &pbcatalog.ServiceEndpoints{ Endpoints: []*pbcatalog.Endpoint{ { - TargetRef: suite.workloadID, - Addresses: suite.workload.Addresses, - Ports: suite.workload.Ports, + TargetRef: suite.apiWorkloadID, + Addresses: suite.apiWorkload.Addresses, + Ports: suite.apiWorkload.Ports, + Identity: "api-identity", }, }, } - resourcetest.Resource(catalog.ServiceEndpointsType, "test-service"). - WithData(suite.T(), endpoints). + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), apiEndpoints). Write(suite.T(), suite.client.ResourceServiceClient) // Check that proxy state template resource is generated. - var proxyStateTmpl *pbresource.Resource + var apiProxyStateTmpl *pbresource.Resource + retry.Run(suite.T(), func(r *retry.R) { + apiProxyStateTmpl = suite.client.RequireResourceExists(r, apiProxyStateTemplateID) + }) + + // Add a source service and check that a new proxy state is generated. + webProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "web-def").ID() + webWorkload := resourcetest.Resource(catalog.WorkloadType, "web-def"). + WithData(suite.T(), suite.webWorkload). + Write(suite.T(), suite.client) + resourcetest.Resource(catalog.ServiceType, "web"). + WithData(suite.T(), &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}}, + Ports: []*pbcatalog.ServicePort{ + {TargetPort: "tcp", Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + {TargetPort: "mesh", Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }}). + Write(suite.T(), suite.client) + resourcetest.Resource(catalog.ServiceEndpointsType, "web"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: webWorkload.Id, + Addresses: suite.webWorkload.Addresses, + Ports: suite.webWorkload.Ports, + Identity: "web-identity", + }, + }, + }).Write(suite.T(), suite.client) + webDestinations := resourcetest.Resource(types.UpstreamsType, "web-destinations"). + WithData(suite.T(), &pbmesh.Upstreams{ + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}}, + Upstreams: []*pbmesh.Upstream{ + { + DestinationRef: resource.Reference(apiService.Id, ""), + DestinationPort: "tcp", + }, + }, + }).Write(suite.T(), suite.client) + var webProxyStateTemplate *pbresource.Resource retry.Run(suite.T(), func(r *retry.R) { - proxyStateTmpl = suite.client.RequireResourceExists(r, proxyStateTemplateID) + webProxyStateTemplate = suite.client.RequireResourceExists(r, webProxyStateTemplateID) }) + // Update destination's service apiEndpoints to be non-mesh and check that we get a new web proxy resource re-generated + // and that the status on Upstreams resource is updated with a validation error. + delete(apiEndpoints.Endpoints[0].Ports, "mesh") + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), apiEndpoints). + Write(suite.T(), suite.client.ResourceServiceClient) + serviceRef := cache.KeyFromRefAndPort(resource.Reference(apiService.Id, ""), "tcp") + //destinationRef := cache.KeyFromID(webDestinations.Id) + + suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName, + status.ConditionNonMeshDestination(serviceRef)) + + // We should also get a new web proxy template resource as this destination should be removed. + suite.client.WaitForNewVersion(suite.T(), apiProxyStateTemplateID, webProxyStateTemplate.Version) + + // Update destination's service apiEndpoints back to mesh and check that we get a new web proxy resource re-generated + // and that the status on Upstreams resource is updated to be empty. + apiEndpoints.Endpoints[0].Ports["mesh"] = &pbcatalog.WorkloadPort{Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH} + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), apiEndpoints). + Write(suite.T(), suite.client.ResourceServiceClient) + + suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName, + status.ConditionMeshDestination(serviceRef)) + + // We should also get a new web proxy template resource as this destination should be added again. + suite.client.WaitForNewVersion(suite.T(), apiProxyStateTemplateID, webProxyStateTemplate.Version) + // Delete the proxy state template resource and check that it gets regenerated. - _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: proxyStateTemplateID}) + _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: apiProxyStateTemplateID}) require.NoError(suite.T(), err) - suite.client.WaitForNewVersion(suite.T(), proxyStateTemplateID, proxyStateTmpl.Version) + suite.client.WaitForNewVersion(suite.T(), apiProxyStateTemplateID, apiProxyStateTmpl.Version) - // Update workload and service endpoints to not be on the mesh anymore + // Update apiWorkload and service apiEndpoints to not be on the mesh anymore // and check that the proxy state template is deleted. - delete(suite.workload.Ports, "mesh") - resourcetest.Resource(catalog.WorkloadType, "test-workload"). - WithData(suite.T(), suite.workload). + delete(suite.apiWorkload.Ports, "mesh") + resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), suite.apiWorkload). Write(suite.T(), suite.client.ResourceServiceClient) - delete(endpoints.Endpoints[0].Ports, "mesh") - resourcetest.Resource(catalog.ServiceEndpointsType, "test-service"). - WithData(suite.T(), endpoints). + delete(apiEndpoints.Endpoints[0].Ports, "mesh") + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), apiEndpoints). Write(suite.T(), suite.client.ResourceServiceClient) retry.Run(suite.T(), func(r *retry.R) { - suite.client.RequireResourceNotFound(r, proxyStateTemplateID) + suite.client.RequireResourceNotFound(r, apiProxyStateTemplateID) }) } diff --git a/internal/mesh/internal/controllers/mesh/fetcher/data_fetcher.go b/internal/mesh/internal/controllers/mesh/fetcher/data_fetcher.go new file mode 100644 index 0000000000000..ddbebd1ad3560 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/fetcher/data_fetcher.go @@ -0,0 +1,244 @@ +package fetcher + +import ( + "context" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + status2 "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/status" + "github.com/hashicorp/consul/internal/mesh/internal/types" + intermediateTypes "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Fetcher struct { + Client pbresource.ResourceServiceClient + Cache *cache.Cache +} + +func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Workload, error) { + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + + switch { + case status.Code(err) == codes.NotFound: + // We also need to make sure to delete the associated proxy from cache. + // We are ignoring errors from cache here as this deletion is best effort. + f.Cache.DeleteSourceProxy(resource.ReplaceType(types.ProxyStateTemplateType, id)) + return nil, nil + case err != nil: + return nil, err + } + + w := &intermediateTypes.Workload{ + Resource: rsp.Resource, + } + + var workload pbcatalog.Workload + err = rsp.Resource.Data.UnmarshalTo(&workload) + if err != nil { + return nil, resource.NewErrDataParse(&workload, err) + } + + w.Workload = &workload + return w, nil +} + +func (f *Fetcher) FetchProxyStateTemplate(ctx context.Context, id *pbresource.ID) (*intermediateTypes.ProxyStateTemplate, error) { + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + + switch { + case status.Code(err) == codes.NotFound: + return nil, nil + case err != nil: + return nil, err + } + + p := &intermediateTypes.ProxyStateTemplate{ + Resource: rsp.Resource, + } + + var tmpl pbmesh.ProxyStateTemplate + err = rsp.Resource.Data.UnmarshalTo(&tmpl) + if err != nil { + return nil, resource.NewErrDataParse(&tmpl, err) + } + + p.Tmpl = &tmpl + return p, nil +} + +func (f *Fetcher) FetchServiceEndpoints(ctx context.Context, id *pbresource.ID) (*intermediateTypes.ServiceEndpoints, error) { + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + + switch { + case status.Code(err) == codes.NotFound: + return nil, nil + case err != nil: + return nil, err + } + + se := &intermediateTypes.ServiceEndpoints{ + Resource: rsp.Resource, + } + + var endpoints pbcatalog.ServiceEndpoints + err = rsp.Resource.Data.UnmarshalTo(&endpoints) + if err != nil { + return nil, resource.NewErrDataParse(&endpoints, err) + } + + se.Endpoints = &endpoints + return se, nil +} + +func (f *Fetcher) FetchDestinations(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Destinations, error) { + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + + switch { + case status.Code(err) == codes.NotFound: + return nil, nil + case err != nil: + return nil, err + } + + u := &intermediateTypes.Destinations{ + Resource: rsp.Resource, + } + + var destinations pbmesh.Upstreams + err = rsp.Resource.Data.UnmarshalTo(&destinations) + if err != nil { + return nil, resource.NewErrDataParse(&destinations, err) + } + + u.Destinations = &destinations + return u, nil +} + +func (f *Fetcher) FetchDestinationsData( + ctx context.Context, + destinationRefs []*intermediateTypes.CombinedDestinationRef, +) ([]*intermediateTypes.Destination, map[string]*intermediateTypes.Status, error) { + + var destinations []*intermediateTypes.Destination + statuses := make(map[string]*intermediateTypes.Status) + for _, dest := range destinationRefs { + // Fetch Destinations resource if there is one. + us, err := f.FetchDestinations(ctx, dest.ExplicitDestinationsID) + if err != nil { + // If there's an error, return and force another reconcile instead of computing + // partial proxy state. + return nil, statuses, err + } + + if us == nil { + // If the Destinations resource is not found, then we should delete it from cache and continue. + f.Cache.Delete(dest.ServiceRef, dest.Port) + continue + } + + u := &intermediateTypes.Destination{} + // As Destinations resource contains a list of destinations, + // we need to find the one that references our service and port. + u.Explicit = findDestination(dest.ServiceRef, dest.Port, us.Destinations) + + // Fetch ServiceEndpoints. + serviceID := resource.IDFromReference(dest.ServiceRef) + se, err := f.FetchServiceEndpoints(ctx, resource.ReplaceType(catalog.ServiceEndpointsType, serviceID)) + if err != nil { + return nil, statuses, err + } + + serviceRef := cache.KeyFromRefAndPort(dest.ServiceRef, dest.Port) + upstreamsRef := cache.KeyFromID(us.Resource.Id) + if se == nil { + // If the Service Endpoints resource is not found, then we update the status of the Upstreams resource + // but not remove it from cache in case it comes back. + updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID, + us.Resource.Status, us.Resource.Generation, status2.ConditionDestinationServiceNotFound(serviceRef)) + continue + } else { + updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID, + us.Resource.Status, us.Resource.Generation, status2.ConditionDestinationServiceFound(serviceRef)) + } + + u.ServiceEndpoints = se + + // Check if this endpoints is mesh-enabled. If not, remove it from cache and return an error. + if !IsMeshEnabled(se.Endpoints.Endpoints[0].Ports) { + // Add invalid status but don't remove from cache. If this state changes, + // we want to be able to detect this change. + updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID, + us.Resource.Status, us.Resource.Generation, status2.ConditionNonMeshDestination(serviceRef)) + + // This error should not cause the execution to stop, as we want to make sure that this non-mesh destination + // gets removed from the proxy state. + continue + } else { + // If everything was successful, add an empty condition so that we can remove any existing statuses. + updateStatusCondition(statuses, upstreamsRef, dest.ExplicitDestinationsID, + us.Resource.Status, us.Resource.Generation, status2.ConditionMeshDestination(serviceRef)) + } + + // Gather all identities. + if se != nil { + var identities []*pbresource.Reference + for _, ep := range se.Endpoints.Endpoints { + identities = append(identities, &pbresource.Reference{ + Name: ep.Identity, + Tenancy: se.Resource.Id.Tenancy, + }) + } + u.Identities = identities + } + + destinations = append(destinations, u) + } + + return destinations, statuses, nil +} + +// IsMeshEnabled returns true if apiWorkload or service endpoints port +// contain a port with the "mesh" protocol. +func IsMeshEnabled(ports map[string]*pbcatalog.WorkloadPort) bool { + for _, port := range ports { + if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + return true + } + } + return false +} + +func findDestination(ref *pbresource.Reference, port string, destinations *pbmesh.Upstreams) *pbmesh.Upstream { + for _, destination := range destinations.Upstreams { + if resource.EqualReference(ref, destination.DestinationRef) && + port == destination.DestinationPort { + return destination + } + } + return nil +} + +func updateStatusCondition( + statuses map[string]*intermediateTypes.Status, + key string, + id *pbresource.ID, + oldStatus map[string]*pbresource.Status, + generation string, + condition *pbresource.Condition) { + if _, ok := statuses[key]; ok { + statuses[key].Conditions = append(statuses[key].Conditions, condition) + } else { + statuses[key] = &intermediateTypes.Status{ + ID: id, + Generation: generation, + Conditions: []*pbresource.Condition{condition}, + OldStatus: oldStatus, + } + } +} diff --git a/internal/mesh/internal/controllers/mesh/fetcher/data_fetcher_test.go b/internal/mesh/internal/controllers/mesh/fetcher/data_fetcher_test.go new file mode 100644 index 0000000000000..801be7e6f00fd --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/fetcher/data_fetcher_test.go @@ -0,0 +1,546 @@ +package fetcher + +import ( + "context" + "testing" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + meshStatus "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/status" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type dataFetcherSuite struct { + suite.Suite + + ctx context.Context + client pbresource.ResourceServiceClient + rt controller.Runtime + + api1Service *pbresource.Resource + api2Service *pbresource.Resource + api1ServiceEndpoints *pbresource.Resource + api1ServiceEndpointsData *pbcatalog.ServiceEndpoints + api2ServiceEndpoints *pbresource.Resource + api2ServiceEndpointsData *pbcatalog.ServiceEndpoints + webDestinations *pbresource.Resource + webDestinationsData *pbmesh.Upstreams + webProxy *pbresource.Resource + webWorkload *pbresource.Resource +} + +func TestIsMeshEnabled(t *testing.T) { + cases := map[string]struct { + ports map[string]*pbcatalog.WorkloadPort + exp bool + }{ + "nil ports": { + ports: nil, + exp: false, + }, + "empty ports": { + ports: make(map[string]*pbcatalog.WorkloadPort), + exp: false, + }, + "no mesh ports": { + ports: map[string]*pbcatalog.WorkloadPort{ + "p1": {Port: 1000, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + "p2": {Port: 2000, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + }, + exp: false, + }, + "one mesh port": { + ports: map[string]*pbcatalog.WorkloadPort{ + "p1": {Port: 1000, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + "p2": {Port: 2000, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "p3": {Port: 3000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + exp: true, + }, + "multiple mesh port": { + ports: map[string]*pbcatalog.WorkloadPort{ + "p1": {Port: 1000, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, + "p2": {Port: 2000, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "p3": {Port: 3000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + "p4": {Port: 4000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + exp: true, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require.Equal(t, c.exp, IsMeshEnabled(c.ports)) + }) + } +} + +func (suite *dataFetcherSuite) SetupTest() { + suite.ctx = testutil.TestContext(suite.T()) + suite.client = svctest.RunResourceService(suite.T(), types.Register, catalog.RegisterTypes) + suite.rt = controller.Runtime{ + Client: suite.client, + Logger: testutil.Logger(suite.T()), + } + + suite.api1Service = resourcetest.Resource(catalog.ServiceType, "api-1"). + WithData(suite.T(), &pbcatalog.Service{}). + Write(suite.T(), suite.client) + + suite.api1ServiceEndpointsData = &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "mesh": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + Identity: "api-1-identity", + }, + }, + } + suite.api1ServiceEndpoints = resourcetest.Resource(catalog.ServiceEndpointsType, "api-1"). + WithData(suite.T(), suite.api1ServiceEndpointsData).Write(suite.T(), suite.client) + + suite.api2Service = resourcetest.Resource(catalog.ServiceType, "api-2"). + WithData(suite.T(), &pbcatalog.Service{}). + Write(suite.T(), suite.client) + + suite.api2ServiceEndpointsData = &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.2"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp1": {Port: 9080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "tcp2": {Port: 9081, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "mesh": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + Identity: "api-2-identity", + }, + }, + } + suite.api2ServiceEndpoints = resourcetest.Resource(catalog.ServiceEndpointsType, "api-2"). + WithData(suite.T(), suite.api2ServiceEndpointsData).Write(suite.T(), suite.client) + + suite.webDestinationsData = &pbmesh.Upstreams{ + Upstreams: []*pbmesh.Upstream{ + { + DestinationRef: resource.Reference(suite.api1Service.Id, ""), + DestinationPort: "tcp", + }, + { + DestinationRef: resource.Reference(suite.api2Service.Id, ""), + DestinationPort: "tcp1", + }, + { + DestinationRef: resource.Reference(suite.api2Service.Id, ""), + DestinationPort: "tcp2", + }, + }, + } + + suite.webDestinations = resourcetest.Resource(types.UpstreamsType, "web-destinations"). + WithData(suite.T(), suite.webDestinationsData). + Write(suite.T(), suite.client) + + suite.webProxy = resourcetest.Resource(types.ProxyStateTemplateType, "web-abc"). + WithData(suite.T(), &pbmesh.ProxyStateTemplate{}). + Write(suite.T(), suite.client) + + suite.webWorkload = resourcetest.Resource(catalog.WorkloadType, "web-abc"). + WithData(suite.T(), &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.2"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"tcp": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}}, + }). + Write(suite.T(), suite.client) +} + +func (suite *dataFetcherSuite) TestFetcher_FetchWorkload_WorkloadNotFound() { + proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() + + // Create cache and pre-populate it. + c := cache.New() + dest1 := &intermediate.CombinedDestinationRef{ + ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service-1").ReferenceNoSection(), + Port: "tcp", + ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "test-servicedestinations-1").ID(), + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(proxyID): proxyID, + }, + } + dest2 := &intermediate.CombinedDestinationRef{ + ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service-2").ReferenceNoSection(), + Port: "tcp", + ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "test-servicedestinations-2").ID(), + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(proxyID): proxyID, + }, + } + c.Write(dest1) + c.Write(dest2) + + f := Fetcher{Cache: c, Client: suite.client} + _, err := f.FetchWorkload(context.Background(), proxyID) + require.NoError(suite.T(), err) + + // Check that cache is updated to remove proxy id. + require.Nil(suite.T(), c.DestinationsBySourceProxy(proxyID)) +} + +func (suite *dataFetcherSuite) TestFetcher_NotFound() { + f := Fetcher{ + Client: suite.client, + } + + cases := map[string]struct { + typ *pbresource.Type + fetchFunc func(id *pbresource.ID) error + }{ + "proxy state template": { + typ: types.ProxyStateTemplateType, + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchProxyStateTemplate(context.Background(), id) + return err + }, + }, + "service endpoints": { + typ: catalog.ServiceEndpointsType, + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchServiceEndpoints(context.Background(), id) + return err + }, + }, + "destinations": { + typ: types.UpstreamsType, + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchDestinations(context.Background(), id) + return err + }, + }, + } + + for name, c := range cases { + suite.T().Run(name, func(t *testing.T) { + err := c.fetchFunc(resourcetest.Resource(c.typ, "not-found").ID()) + require.NoError(t, err) + }) + } +} + +func (suite *dataFetcherSuite) TestFetcher_FetchErrors() { + f := Fetcher{ + Client: suite.client, + } + + cases := map[string]struct { + name string + fetchFunc func(id *pbresource.ID) error + }{ + "workload": { + name: "web-abc", + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchWorkload(context.Background(), id) + return err + }, + }, + "proxy state template": { + name: "web-abc", + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchProxyStateTemplate(context.Background(), id) + return err + }, + }, + "service endpoints": { + name: "api-1", + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchServiceEndpoints(context.Background(), id) + return err + }, + }, + "destinations": { + name: "web-destinations", + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchDestinations(context.Background(), id) + return err + }, + }, + } + + for name, c := range cases { + suite.T().Run(name+"-read", func(t *testing.T) { + badType := &pbresource.Type{ + Group: "not", + Kind: "found", + GroupVersion: "vfake", + } + err := c.fetchFunc(resourcetest.Resource(badType, c.name).ID()) + require.Error(t, err) + require.Equal(t, codes.InvalidArgument, status.Code(err)) + }) + + suite.T().Run(name+"-unmarshal", func(t *testing.T) { + // Create a dummy health checks type as it won't be any of the types mesh controller cares about + resourcetest.Resource(catalog.HealthChecksType, c.name). + WithData(suite.T(), &pbcatalog.HealthChecks{ + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-abc"}}, + }). + Write(suite.T(), suite.client) + + err := c.fetchFunc(resourcetest.Resource(catalog.HealthChecksType, c.name).ID()) + require.Error(t, err) + var parseErr resource.ErrDataParse + require.ErrorAs(t, err, &parseErr) + }) + } +} + +func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { + destination1 := &intermediate.CombinedDestinationRef{ + ServiceRef: resource.Reference(suite.api1Service.Id, ""), + Port: "tcp", + ExplicitDestinationsID: suite.webDestinations.Id, + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(suite.webProxy.Id): suite.webProxy.Id, + }, + } + destination2 := &intermediate.CombinedDestinationRef{ + ServiceRef: resource.Reference(suite.api2Service.Id, ""), + Port: "tcp1", + ExplicitDestinationsID: suite.webDestinations.Id, + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(suite.webProxy.Id): suite.webProxy.Id, + }, + } + destination3 := &intermediate.CombinedDestinationRef{ + ServiceRef: resource.Reference(suite.api2Service.Id, ""), + Port: "tcp2", + ExplicitDestinationsID: suite.webDestinations.Id, + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(suite.webProxy.Id): suite.webProxy.Id, + }, + } + destinationRefNoDestinations := &intermediate.CombinedDestinationRef{ + ServiceRef: resource.Reference(suite.api1Service.Id, ""), + Port: "tcp", + ExplicitDestinationsID: resourcetest.Resource(types.UpstreamsType, "not-found").ID(), + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(suite.webProxy.Id): suite.webProxy.Id, + }, + } + notFoundServiceRef := resourcetest.Resource(catalog.ServiceType, "not-found").ReferenceNoSection() + destinationNoServiceEndpoints := &intermediate.CombinedDestinationRef{ + ServiceRef: notFoundServiceRef, + Port: "tcp", + ExplicitDestinationsID: suite.webDestinations.Id, + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(suite.webProxy.Id): suite.webProxy.Id, + }, + } + apiNonMeshServiceEndpointsData := &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + }, + Identity: "api-1-identity", + }, + }, + } + apiNonMeshServiceEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-1"). + WithData(suite.T(), apiNonMeshServiceEndpointsData).Write(suite.T(), suite.client) + destinationNonMeshServiceEndpoints := &intermediate.CombinedDestinationRef{ + ServiceRef: resource.Reference(apiNonMeshServiceEndpoints.Owner, ""), + Port: "tcp", + ExplicitDestinationsID: suite.webDestinations.Id, + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(suite.webProxy.Id): suite.webProxy.Id, + }, + } + + c := cache.New() + c.Write(destination1) + c.Write(destination2) + c.Write(destination3) + c.Write(destinationRefNoDestinations) + c.Write(destinationNoServiceEndpoints) + + f := Fetcher{ + Cache: c, + Client: suite.client, + } + + suite.T().Run("destinations not found", func(t *testing.T) { + destinationRefs := []*intermediate.CombinedDestinationRef{destinationRefNoDestinations} + destinations, _, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + require.NoError(t, err) + require.Nil(t, destinations) + require.Nil(t, c.ReadDestination(destinationRefNoDestinations.ServiceRef, destinationRefNoDestinations.Port)) + }) + + suite.T().Run("service endpoints not found", func(t *testing.T) { + destinationRefs := []*intermediate.CombinedDestinationRef{destinationNoServiceEndpoints} + destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + require.NoError(t, err) + require.Nil(t, destinations) + + destinationRef := cache.KeyFromID(destinationNoServiceEndpoints.ExplicitDestinationsID) + serviceRef := cache.KeyFromRefAndPort(destinationNoServiceEndpoints.ServiceRef, destinationNoServiceEndpoints.Port) + + require.Len(t, statuses[destinationRef].Conditions, 1) + require.Equal(t, statuses[destinationRef].Conditions[0], + meshStatus.ConditionDestinationServiceNotFound(serviceRef)) + + require.NotNil(t, c.ReadDestination(destinationNoServiceEndpoints.ServiceRef, destinationNoServiceEndpoints.Port)) + }) + + suite.T().Run("service endpoints not on mesh", func(t *testing.T) { + destinationRefs := []*intermediate.CombinedDestinationRef{destinationNonMeshServiceEndpoints} + destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + require.NoError(t, err) + require.Nil(t, destinations) + + destinationRef := cache.KeyFromID(destinationNonMeshServiceEndpoints.ExplicitDestinationsID) + serviceRef := cache.KeyFromRefAndPort(destinationNonMeshServiceEndpoints.ServiceRef, destinationNonMeshServiceEndpoints.Port) + + require.Len(t, statuses[destinationRef].Conditions, 2) + prototest.AssertElementsMatch(t, statuses[destinationRef].Conditions, + []*pbresource.Condition{ + meshStatus.ConditionDestinationServiceFound(serviceRef), + meshStatus.ConditionNonMeshDestination(serviceRef), + }) + + require.NotNil(t, c.ReadDestination(destinationNoServiceEndpoints.ServiceRef, destinationNoServiceEndpoints.Port)) + }) + + suite.T().Run("invalid destinations", func(t *testing.T) { + // Update api1 to no longer be on the mesh. + suite.api1ServiceEndpoints = resourcetest.Resource(catalog.ServiceEndpointsType, "api-1"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + }, + Identity: "api-1-identity", + }, + }, + }).Write(suite.T(), suite.client) + + destinationRefs := []*intermediate.CombinedDestinationRef{destination1} + destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + require.NoError(t, err) + require.Len(t, statuses, 1) + + serviceRef := cache.KeyFromRefAndPort(destination1.ServiceRef, destination1.Port) + destinationRef := cache.KeyFromID(destination1.ExplicitDestinationsID) + expectedStatus := &intermediate.Status{ + ID: suite.webDestinations.Id, + Generation: suite.webDestinations.Generation, + Conditions: []*pbresource.Condition{ + meshStatus.ConditionDestinationServiceFound(serviceRef), + meshStatus.ConditionNonMeshDestination(serviceRef), + }, + } + prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef]) + require.Nil(t, destinations) + require.Nil(t, c.ReadDestination(destination1.ServiceRef, destination1.Port)) + + // Update the endpoints to be mesh enabled again and check that the status is empty. + suite.api1ServiceEndpoints = resourcetest.Resource(catalog.ServiceEndpointsType, "api-1"). + WithData(suite.T(), suite.api1ServiceEndpointsData).Write(suite.T(), suite.client) + destinations, statuses, err = f.FetchDestinationsData(suite.ctx, destinationRefs) + require.NoError(t, err) + require.Len(t, statuses, 1) + expectedStatus = &intermediate.Status{ + ID: suite.webDestinations.Id, + Generation: suite.webDestinations.Generation, + Conditions: []*pbresource.Condition{ + meshStatus.ConditionDestinationServiceFound(serviceRef), + meshStatus.ConditionMeshDestination(serviceRef), + }, + } + prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef]) + }) + + suite.T().Run("happy path", func(t *testing.T) { + destinations := []*intermediate.CombinedDestinationRef{destination1, destination2, destination3} + expectedDestinations := []*intermediate.Destination{ + { + Explicit: suite.webDestinationsData.Upstreams[0], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api1ServiceEndpoints, + Endpoints: suite.api1ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-1-identity", + Tenancy: suite.api1Service.Id.Tenancy, + }, + }, + }, + { + Explicit: suite.webDestinationsData.Upstreams[1], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api2ServiceEndpoints, + Endpoints: suite.api2ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-2-identity", + Tenancy: suite.api2Service.Id.Tenancy, + }, + }, + }, + { + Explicit: suite.webDestinationsData.Upstreams[2], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api2ServiceEndpoints, + Endpoints: suite.api2ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-2-identity", + Tenancy: suite.api2Service.Id.Tenancy, + }, + }, + }, + } + + actualDestinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinations) + require.NoError(t, err) + + // Check that all statuses have nil conditions. + dref := cache.KeyFromID(destination1.ExplicitDestinationsID) + var expectedConditions []*pbresource.Condition + for _, d := range destinations { + ref := cache.KeyFromRefAndPort(d.ServiceRef, d.Port) + expectedConditions = append(expectedConditions, + meshStatus.ConditionDestinationServiceFound(ref), + meshStatus.ConditionMeshDestination(ref)) + } + prototest.AssertElementsMatch(t, expectedConditions, statuses[dref].Conditions) + + require.Equal(t, len(expectedDestinations), len(actualDestinations)) + prototest.AssertElementsMatch(t, expectedDestinations, actualDestinations) + }) +} + +func TestDataFetcher(t *testing.T) { + suite.Run(t, new(dataFetcherSuite)) +} diff --git a/internal/mesh/internal/controllers/mesh/mapper/destinations_mapper.go b/internal/mesh/internal/controllers/mesh/mapper/destinations_mapper.go new file mode 100644 index 0000000000000..4a12a35e78a86 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/mapper/destinations_mapper.go @@ -0,0 +1,69 @@ +package mapper + +import ( + "context" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func (m *Mapper) MapDestinationsToProxyStateTemplate(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { + var destinations pbmesh.Upstreams + err := res.Data.UnmarshalTo(&destinations) + if err != nil { + return nil, err + } + + // Look up workloads for this destinations. + sourceProxyIDs := make(map[string]*pbresource.ID) + var result []controller.Request + for _, prefix := range destinations.Workloads.Prefixes { + resp, err := rt.Client.List(ctx, &pbresource.ListRequest{ + Type: catalog.WorkloadType, + Tenancy: res.Id.Tenancy, + NamePrefix: prefix, + }) + if err != nil { + return nil, err + } + for _, r := range resp.Resources { + proxyID := resource.ReplaceType(types.ProxyStateTemplateType, r.Id) + sourceProxyIDs[cache.KeyFromID(proxyID)] = proxyID + result = append(result, controller.Request{ + ID: proxyID, + }) + } + } + + for _, name := range destinations.Workloads.Names { + id := &pbresource.ID{ + Name: name, + Tenancy: res.Id.Tenancy, + Type: catalog.WorkloadType, + } + proxyID := resource.ReplaceType(types.ProxyStateTemplateType, id) + sourceProxyIDs[cache.KeyFromID(proxyID)] = proxyID + result = append(result, controller.Request{ + ID: proxyID, + }) + } + + // Add this destination to cache. + for _, destination := range destinations.Upstreams { + destinationRef := &intermediate.CombinedDestinationRef{ + ServiceRef: destination.DestinationRef, + Port: destination.DestinationPort, + ExplicitDestinationsID: res.Id, + SourceProxies: sourceProxyIDs, + } + m.Cache.Write(destinationRef) + } + + return result, nil +} diff --git a/internal/mesh/internal/controllers/mesh/mapper/destinations_mapper_test.go b/internal/mesh/internal/controllers/mesh/mapper/destinations_mapper_test.go new file mode 100644 index 0000000000000..d6aba0604db4d --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/mapper/destinations_mapper_test.go @@ -0,0 +1,99 @@ +package mapper + +import ( + "context" + "testing" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/stretchr/testify/require" +) + +func TestMapDestinationsToProxyStateTemplate(t *testing.T) { + client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes) + webWorkload1 := resourcetest.Resource(catalog.WorkloadType, "web-abc"). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"tcp": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}}, + }). + Write(t, client) + webWorkload2 := resourcetest.Resource(catalog.WorkloadType, "web-def"). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.2"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"tcp": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}}, + }). + Write(t, client) + webWorkload3 := resourcetest.Resource(catalog.WorkloadType, "non-prefix-web"). + WithData(t, &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.3"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"tcp": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}}, + }). + Write(t, client) + + webDestinationsData := &pbmesh.Upstreams{ + Workloads: &pbcatalog.WorkloadSelector{ + Names: []string{"non-prefix-web"}, + Prefixes: []string{"web"}, + }, + Upstreams: []*pbmesh.Upstream{ + { + DestinationRef: resourcetest.Resource(catalog.ServiceType, "api-1").ReferenceNoSection(), + DestinationPort: "tcp", + }, + { + DestinationRef: resourcetest.Resource(catalog.ServiceType, "api-2").ReferenceNoSection(), + DestinationPort: "tcp1", + }, + { + DestinationRef: resourcetest.Resource(catalog.ServiceType, "api-2").ReferenceNoSection(), + DestinationPort: "tcp2", + }, + }, + } + + webDestinations := resourcetest.Resource(types.UpstreamsType, "web-destinations"). + WithData(t, webDestinationsData). + Write(t, client) + + c := cache.New() + mapper := &Mapper{Cache: c} + + expRequests := []controller.Request{ + {ID: resource.ReplaceType(types.ProxyStateTemplateType, webWorkload1.Id)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, webWorkload2.Id)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, webWorkload3.Id)}, + } + + requests, err := mapper.MapDestinationsToProxyStateTemplate(context.Background(), controller.Runtime{Client: client}, webDestinations) + require.NoError(t, err) + prototest.AssertElementsMatch(t, expRequests, requests) + + //var expDestinations []*intermediate.CombinedDestinationRef + proxy1ID := resourcetest.Resource(types.ProxyStateTemplateType, webWorkload1.Id.Name).ID() + proxy2ID := resourcetest.Resource(types.ProxyStateTemplateType, webWorkload2.Id.Name).ID() + proxy3ID := resourcetest.Resource(types.ProxyStateTemplateType, webWorkload3.Id.Name).ID() + for _, u := range webDestinationsData.Upstreams { + expDestination := &intermediate.CombinedDestinationRef{ + ServiceRef: u.DestinationRef, + Port: u.DestinationPort, + ExplicitDestinationsID: webDestinations.Id, + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(proxy1ID): proxy1ID, + cache.KeyFromID(proxy2ID): proxy2ID, + cache.KeyFromID(proxy3ID): proxy3ID, + }, + } + prototest.AssertDeepEqual(t, expDestination, c.ReadDestination(u.DestinationRef, u.DestinationPort)) + } + +} diff --git a/internal/mesh/internal/controllers/mesh/mapper/service_endpoints_mapper.go b/internal/mesh/internal/controllers/mesh/mapper/service_endpoints_mapper.go new file mode 100644 index 0000000000000..8174daf999941 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/mapper/service_endpoints_mapper.go @@ -0,0 +1,63 @@ +package mapper + +import ( + "context" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +type Mapper struct { + Cache *cache.Cache +} + +// MapServiceEndpointsToProxyStateTemplate maps catalog.ServiceEndpoints objects to the IDs of +// ProxyStateTemplate. +// For a destination proxy, we only need to generate requests from workloads this "endpoints" points to +// so that we can re-generate proxy state for the sidecar proxy. +// If this service endpoints is a source for some proxies, we need to generate requests for those proxies as well. +// so we need to have a map from service endpoints to source proxy Ids. +func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { + // This mapper needs to look up workload IDs from service endpoints and replace them with ProxyStateTemplate type. + var serviceEndpoints pbcatalog.ServiceEndpoints + err := res.Data.UnmarshalTo(&serviceEndpoints) + if err != nil { + return nil, err + } + + var result []controller.Request + + for _, endpoint := range serviceEndpoints.Endpoints { + // Convert the reference to a workload to a ProxyStateTemplate ID. + // Because these resources are name and tenancy aligned, we only need to change the type. + result = append(result, controller.Request{ + ID: &pbresource.ID{ + Name: endpoint.TargetRef.Name, + Tenancy: endpoint.TargetRef.Tenancy, + Type: types.ProxyStateTemplateType, + }, + }) + } + + // Look up any source proxies for this service and generate updates. + serviceID := resource.ReplaceType(catalog.ServiceType, res.Id) + + // todo (ishustava): assume for now that all endpoints have the same ports. This will be adjusted in the future. + if len(serviceEndpoints.Endpoints) > 0 { + for portName := range serviceEndpoints.Endpoints[0].Ports { + destination := m.Cache.ReadDestination(resource.Reference(serviceID, ""), portName) + if destination != nil { + for _, id := range destination.SourceProxies { + result = append(result, controller.Request{ID: id}) + } + } + } + } + + return result, err +} diff --git a/internal/mesh/internal/controllers/mesh/mapper/service_endpoints_mapper_test.go b/internal/mesh/internal/controllers/mesh/mapper/service_endpoints_mapper_test.go new file mode 100644 index 0000000000000..5e5605f7d15e0 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/mapper/service_endpoints_mapper_test.go @@ -0,0 +1,80 @@ +package mapper + +import ( + "context" + "testing" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/stretchr/testify/require" +) + +func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { + workload1 := resourcetest.Resource(catalog.WorkloadType, "workload-1").Build() + workload2 := resourcetest.Resource(catalog.WorkloadType, "workload-2").Build() + serviceEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "service"). + WithData(t, &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: workload1.Id, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp1": {Port: 8080}, + "tcp2": {Port: 8081}, + }, + }, + { + TargetRef: workload2.Id, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp1": {Port: 8080}, + "tcp2": {Port: 8081}, + }, + }, + }, + }).Build() + proxyTmpl1ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-1").ID() + proxyTmpl2ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-2").ID() + + c := cache.New() + mapper := &Mapper{Cache: c} + sourceProxy1 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-3").ID() + sourceProxy2 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-4").ID() + sourceProxy3 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-5").ID() + destination1 := &intermediate.CombinedDestinationRef{ + ServiceRef: resourcetest.Resource(catalog.ServiceType, "service").Reference(), + Port: "tcp1", + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(sourceProxy1): sourceProxy1, + cache.KeyFromID(sourceProxy2): sourceProxy2, + }, + } + destination2 := &intermediate.CombinedDestinationRef{ + ServiceRef: resourcetest.Resource(catalog.ServiceType, "service").Reference(), + Port: "tcp2", + SourceProxies: map[string]*pbresource.ID{ + cache.KeyFromID(sourceProxy1): sourceProxy1, + cache.KeyFromID(sourceProxy3): sourceProxy3, + }, + } + c.Write(destination1) + c.Write(destination2) + + expRequests := []controller.Request{ + {ID: proxyTmpl1ID}, + {ID: proxyTmpl2ID}, + {ID: sourceProxy1}, + {ID: sourceProxy2}, + {ID: sourceProxy1}, + {ID: sourceProxy3}, + } + + requests, err := mapper.MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{}, serviceEndpoints) + require.NoError(t, err) + prototest.AssertElementsMatch(t, expRequests, requests) +} diff --git a/internal/mesh/internal/controllers/mesh/mappers/service_endpoints.go b/internal/mesh/internal/controllers/mesh/mappers/service_endpoints.go deleted file mode 100644 index 9453691532bd3..0000000000000 --- a/internal/mesh/internal/controllers/mesh/mappers/service_endpoints.go +++ /dev/null @@ -1,40 +0,0 @@ -package mappers - -import ( - "context" - - "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh/internal/types" - pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" - "github.com/hashicorp/consul/proto-public/pbresource" -) - -// MapServiceEndpointsToProxyStateTemplate maps catalog.ServiceEndpoints objects to the IDs of -// ProxyStateTemplate. -// For a downstream proxy, we only need to generate requests from workloads this endpoints points to -// If this service endpoints is an upstream for some proxies, we need to generate requests for those proxies as well. -// so we need to have a map from service endpoints to downstream proxy Ids -func MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { - // This mapper needs to look up workload IDs from service endpoints and replace them with proxystatetemplatetype. - var serviceEndpoints pbcatalog.ServiceEndpoints - err := res.Data.UnmarshalTo(&serviceEndpoints) - if err != nil { - return nil, err - } - - var result []controller.Request - - for _, endpoint := range serviceEndpoints.Endpoints { - // Convert the reference to a workload to a ProxyStateTemplate ID. - // Because these resources are name and tenancy aligned, we only need to change the type. - result = append(result, controller.Request{ - ID: &pbresource.ID{ - Name: endpoint.TargetRef.Name, - Tenancy: endpoint.TargetRef.Tenancy, - Type: types.ProxyStateTemplateType, - }, - }) - } - - return result, err -} diff --git a/internal/mesh/internal/controllers/mesh/mappers/service_endpoints_test.go b/internal/mesh/internal/controllers/mesh/mappers/service_endpoints_test.go deleted file mode 100644 index 03e73150e9fac..0000000000000 --- a/internal/mesh/internal/controllers/mesh/mappers/service_endpoints_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package mappers - -import ( - "context" - "testing" - - "github.com/hashicorp/consul/internal/catalog" - "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh/internal/types" - "github.com/hashicorp/consul/internal/resource/resourcetest" - pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" - "github.com/stretchr/testify/require" -) - -func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { - workload1 := resourcetest.Resource(catalog.WorkloadType, "workload-1").Build() - workload2 := resourcetest.Resource(catalog.WorkloadType, "workload-2").Build() - serviceEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "service"). - WithData(t, &pbcatalog.ServiceEndpoints{ - Endpoints: []*pbcatalog.Endpoint{ - { - TargetRef: workload1.Id, - }, - { - TargetRef: workload2.Id, - }, - }, - }).Build() - proxyTmpl1ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-1").ID() - proxyTmpl2ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-2").ID() - - expRequests := []controller.Request{ - { - ID: proxyTmpl1ID, - }, - { - ID: proxyTmpl2ID, - }, - } - - requests, err := MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{}, serviceEndpoints) - require.NoError(t, err) - require.ElementsMatch(t, expRequests, requests) -} diff --git a/internal/mesh/internal/controllers/mesh/status/status.go b/internal/mesh/internal/controllers/mesh/status/status.go new file mode 100644 index 0000000000000..0ba125fae6315 --- /dev/null +++ b/internal/mesh/internal/controllers/mesh/status/status.go @@ -0,0 +1,55 @@ +package status + +import ( + "fmt" + + "github.com/hashicorp/consul/proto-public/pbresource" +) + +const ( + StatusConditionMeshDestination = "MeshDestination" + + StatusReasonNonMeshDestination = "MeshPortProtocolNotFound" + StatusReasonMeshDestination = "MeshPortProtocolFound" + + StatusConditionDestinationExists = "DestinationExists" + + StatusReasonDestinationServiceNotFound = "ServiceNotFound" + StatusReasonDestinationServiceFound = "ServiceFound" +) + +func ConditionNonMeshDestination(serviceRef string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionMeshDestination, + State: pbresource.Condition_STATE_FALSE, + Reason: StatusReasonNonMeshDestination, + Message: fmt.Sprintf("service %q cannot be referenced as a Destination because it's not mesh-enabled.", serviceRef), + } +} + +func ConditionMeshDestination(serviceRef string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionMeshDestination, + State: pbresource.Condition_STATE_TRUE, + Reason: StatusReasonMeshDestination, + Message: fmt.Sprintf("service %q is on the mesh.", serviceRef), + } +} + +func ConditionDestinationServiceNotFound(serviceRef string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionDestinationExists, + State: pbresource.Condition_STATE_FALSE, + Reason: StatusReasonDestinationServiceNotFound, + Message: fmt.Sprintf("service %q does not exist.", serviceRef), + } +} + +func ConditionDestinationServiceFound(serviceRef string) *pbresource.Condition { + return &pbresource.Condition{ + Type: StatusConditionDestinationExists, + State: pbresource.Condition_STATE_TRUE, + Reason: StatusReasonDestinationServiceFound, + Message: fmt.Sprintf("service %q exists.", serviceRef), + } +} diff --git a/internal/mesh/internal/controllers/register.go b/internal/mesh/internal/controllers/register.go index 383362077caa1..27562b8d7fb50 100644 --- a/internal/mesh/internal/controllers/register.go +++ b/internal/mesh/internal/controllers/register.go @@ -6,8 +6,13 @@ package controllers import ( "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh" + "github.com/hashicorp/consul/internal/mesh/internal/controllers/mesh/cache" ) -func Register(mgr *controller.Manager) { - mgr.Register(mesh.Controller()) +type Dependencies struct { + TrustDomainFetcher mesh.TrustDomainFetcher +} + +func Register(mgr *controller.Manager, deps Dependencies) { + mgr.Register(mesh.Controller(cache.New(), deps.TrustDomainFetcher)) } diff --git a/internal/mesh/internal/types/intermediate/types.go b/internal/mesh/internal/types/intermediate/types.go new file mode 100644 index 0000000000000..533017bd36c2b --- /dev/null +++ b/internal/mesh/internal/types/intermediate/types.go @@ -0,0 +1,56 @@ +package intermediate + +import ( + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// todo should it be destination? +// the problem is that it's compiled from different source objects +type CombinedDestinationRef struct { + // ServiceRef is the reference to the destination service for this upstream + ServiceRef *pbresource.Reference + + Port string + + // sourceProxies are the IDs of source proxy state template resources. + SourceProxies map[string]*pbresource.ID + + // explicitUpstreamID is the id of an explicit upstreams resource. For implicit upstreams, + // this should be nil. + ExplicitDestinationsID *pbresource.ID +} + +type ServiceEndpoints struct { + Resource *pbresource.Resource + Endpoints *pbcatalog.ServiceEndpoints +} + +type Destinations struct { + Resource *pbresource.Resource + Destinations *pbmesh.Upstreams +} + +type Workload struct { + Resource *pbresource.Resource + Workload *pbcatalog.Workload +} + +type ProxyStateTemplate struct { + Resource *pbresource.Resource + Tmpl *pbmesh.ProxyStateTemplate +} + +type Destination struct { + Explicit *pbmesh.Upstream + ServiceEndpoints *ServiceEndpoints + Identities []*pbresource.Reference +} + +type Status struct { + ID *pbresource.ID + Generation string + Conditions []*pbresource.Condition + OldStatus map[string]*pbresource.Status +} diff --git a/internal/mesh/internal/types/upstreams.go b/internal/mesh/internal/types/upstreams.go index 8ccb1554593bc..14364e3e2ae3e 100644 --- a/internal/mesh/internal/types/upstreams.go +++ b/internal/mesh/internal/types/upstreams.go @@ -10,7 +10,7 @@ import ( ) const ( - UpstreamsKind = "Upstreams" + UpstreamsKind = "Destinations" ) var ( diff --git a/internal/resource/reference.go b/internal/resource/reference.go index 7610f6288d83a..acae27ff8a06b 100644 --- a/internal/resource/reference.go +++ b/internal/resource/reference.go @@ -37,3 +37,11 @@ var ( _ ReferenceOrID = (*pbresource.ID)(nil) _ ReferenceOrID = (*pbresource.Reference)(nil) ) + +func ReplaceType(typ *pbresource.Type, id *pbresource.ID) *pbresource.ID { + return &pbresource.ID{ + Type: typ, + Name: id.Name, + Tenancy: id.Tenancy, + } +} diff --git a/internal/resource/resourcetest/builder.go b/internal/resource/resourcetest/builder.go index 20858bef2749e..220dca97ed1d6 100644 --- a/internal/resource/resourcetest/builder.go +++ b/internal/resource/resourcetest/builder.go @@ -127,6 +127,10 @@ func (b *resourceBuilder) Reference(section string) *pbresource.Reference { return resource.Reference(b.ID(), section) } +func (b *resourceBuilder) ReferenceNoSection() *pbresource.Reference { + return resource.Reference(b.ID(), "") +} + func (b *resourceBuilder) Write(t T, client pbresource.ResourceServiceClient) *pbresource.Resource { t.Helper() diff --git a/proto-public/pbcatalog/v1alpha1/service_endpoints.pb.go b/proto-public/pbcatalog/v1alpha1/service_endpoints.pb.go index 903de0706bf23..436d8d0c9f03b 100644 --- a/proto-public/pbcatalog/v1alpha1/service_endpoints.pb.go +++ b/proto-public/pbcatalog/v1alpha1/service_endpoints.pb.go @@ -88,6 +88,8 @@ type Endpoint struct { Ports map[string]*WorkloadPort `protobuf:"bytes,3,rep,name=ports,proto3" json:"ports,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // health_status is the aggregated health status of this endpoint. HealthStatus Health `protobuf:"varint,4,opt,name=health_status,json=healthStatus,proto3,enum=hashicorp.consul.catalog.v1alpha1.Health" json:"health_status,omitempty"` + // identity is the name of the workload identity for this endpoint. + Identity string `protobuf:"bytes,5,opt,name=identity,proto3" json:"identity,omitempty"` } func (x *Endpoint) Reset() { @@ -150,6 +152,13 @@ func (x *Endpoint) GetHealthStatus() Health { return Health_HEALTH_ANY } +func (x *Endpoint) GetIdentity() string { + if x != nil { + return x.Identity + } + return "" +} + var File_pbcatalog_v1alpha1_service_endpoints_proto protoreflect.FileDescriptor var file_pbcatalog_v1alpha1_service_endpoints_proto_rawDesc = []byte{ @@ -169,7 +178,7 @@ var file_pbcatalog_v1alpha1_service_endpoints_proto_rawDesc = []byte{ 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, - 0x6e, 0x74, 0x52, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x22, 0xa3, 0x03, + 0x6e, 0x74, 0x52, 0x09, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x22, 0xbf, 0x03, 0x0a, 0x08, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x0a, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x72, 0x65, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, @@ -189,33 +198,35 @@ var file_pbcatalog_v1alpha1_service_endpoints_proto_rawDesc = []byte{ 0x29, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x0c, 0x68, 0x65, 0x61, 0x6c, - 0x74, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x1a, 0x69, 0x0a, 0x0a, 0x50, 0x6f, 0x72, 0x74, - 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x45, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, - 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x63, 0x61, 0x74, 0x61, 0x6c, - 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x57, 0x6f, 0x72, 0x6b, - 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x6f, 0x72, 0x74, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x42, 0xb2, 0x02, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, - 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x63, 0x61, 0x74, - 0x61, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x42, 0x15, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x50, - 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, - 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, - 0x2f, 0x70, 0x62, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, - 0x68, 0x61, 0x31, 0x3b, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x76, 0x31, 0x61, 0x6c, 0x70, - 0x68, 0x61, 0x31, 0xa2, 0x02, 0x03, 0x48, 0x43, 0x43, 0xaa, 0x02, 0x21, 0x48, 0x61, 0x73, 0x68, - 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x43, 0x61, 0x74, - 0x61, 0x6c, 0x6f, 0x67, 0x2e, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xca, 0x02, 0x21, + 0x74, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x64, 0x65, 0x6e, + 0x74, 0x69, 0x74, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x69, 0x64, 0x65, 0x6e, + 0x74, 0x69, 0x74, 0x79, 0x1a, 0x69, 0x0a, 0x0a, 0x50, 0x6f, 0x72, 0x74, 0x73, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x6b, 0x65, 0x79, 0x12, 0x45, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, + 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x2e, 0x76, + 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, + 0x50, 0x6f, 0x72, 0x74, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, + 0xb2, 0x02, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, + 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x42, 0x15, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x50, 0x01, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, + 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x2f, 0x70, 0x62, 0x63, + 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, + 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xa2, + 0x02, 0x03, 0x48, 0x43, 0x43, 0xaa, 0x02, 0x21, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, + 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, + 0x2e, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xca, 0x02, 0x21, 0x48, 0x61, 0x73, 0x68, + 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x43, 0x61, 0x74, + 0x61, 0x6c, 0x6f, 0x67, 0x5c, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xe2, 0x02, 0x2d, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x5c, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, - 0x31, 0xe2, 0x02, 0x2d, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, - 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x5c, 0x56, 0x31, 0x61, - 0x6c, 0x70, 0x68, 0x61, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0xea, 0x02, 0x24, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, - 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x3a, 0x3a, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x3a, 0x3a, - 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x24, + 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73, 0x75, + 0x6c, 0x3a, 0x3a, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x3a, 0x3a, 0x56, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto-public/pbcatalog/v1alpha1/service_endpoints.proto b/proto-public/pbcatalog/v1alpha1/service_endpoints.proto index df3c70e0312f3..2b5a8740258cb 100644 --- a/proto-public/pbcatalog/v1alpha1/service_endpoints.proto +++ b/proto-public/pbcatalog/v1alpha1/service_endpoints.proto @@ -29,4 +29,7 @@ message Endpoint { // health_status is the aggregated health status of this endpoint. Health health_status = 4; + + // identity is the name of the workload identity for this endpoint. + string identity = 5; }