diff --git a/internal/catalog/internal/controllers/endpoints/controller.go b/internal/catalog/internal/controllers/endpoints/controller.go index e2365ab66edf9..fdacc002fc7be 100644 --- a/internal/catalog/internal/controllers/endpoints/controller.go +++ b/internal/catalog/internal/controllers/endpoints/controller.go @@ -5,7 +5,6 @@ package endpoints import ( "context" - "fmt" "sort" "github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth" @@ -376,9 +375,6 @@ func workloadToEndpoint(svc *pbcatalog.Service, data *workloadData) *pbcatalog.E return nil } - if data.resource.Id == nil { - fmt.Println("-------------------iryna: workload id is nil") - } return &pbcatalog.Endpoint{ TargetRef: data.resource.Id, HealthStatus: health, diff --git a/internal/catalog/internal/controllers/endpoints/controller_test.go b/internal/catalog/internal/controllers/endpoints/controller_test.go index 4e4194ade1131..cdb6a504550d5 100644 --- a/internal/catalog/internal/controllers/endpoints/controller_test.go +++ b/internal/catalog/internal/controllers/endpoints/controller_test.go @@ -702,20 +702,6 @@ func (suite *controllerSuite) TestController() { endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version) rtest.RequireOwner(suite.T(), endpoints, updatedService.Id, false) - // ensure the endpoint was put into the passing state - suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{ - TargetRef: workload.Id, - Addresses: []*pbcatalog.WorkloadAddress{ - {Host: "127.0.0.1", Ports: []string{"grpc", "http"}}, - }, - Ports: map[string]*pbcatalog.WorkloadPort{ - "http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP}, - "grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC}, - }, - HealthStatus: pbcatalog.Health_HEALTH_PASSING, - Identity: "api", - }) - // Delete the endpoints. The controller should bring these back momentarily suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: endpointsID}) diff --git a/internal/mesh/internal/controllers/sidecar-proxy/builder/destination_builder.go b/internal/mesh/internal/controllers/sidecar-proxy/builder/destination_builder.go index 062ceb02f1eb0..f1975291b830b 100644 --- a/internal/mesh/internal/controllers/sidecar-proxy/builder/destination_builder.go +++ b/internal/mesh/internal/controllers/sidecar-proxy/builder/destination_builder.go @@ -23,15 +23,17 @@ func (b *Builder) buildExplicitDestination(destination *intermediate.Destination clusterName := DestinationClusterName(destination.Explicit.DestinationRef, destination.Explicit.Datacenter, b.trustDomain) statPrefix := DestinationStatPrefix(destination.Explicit.DestinationRef, destination.Explicit.Datacenter) - // We assume that all endpoints have the same port. Later, we will change service endpoints to - // have the global ports map rather than per address. - destPort := destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports[destination.Explicit.DestinationPort] - - if destPort != nil { - return b.addOutboundDestinationListener(destination.Explicit). - addRouter(clusterName, statPrefix, destPort.Protocol). - addCluster(clusterName, destination.Identities). - addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, destination.Explicit.DestinationPort) + // All endpoints should have the same protocol as the endpoints controller ensures that is the case, + // so it's sufficient to read just the first endpoint. + if len(destination.ServiceEndpoints.Endpoints.Endpoints) > 0 { + destPort := destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports[destination.Explicit.DestinationPort] + + if destPort != nil { + return b.addOutboundDestinationListener(destination.Explicit). + addRouter(clusterName, statPrefix, destPort.Protocol). + addCluster(clusterName, destination.Identities). + addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, destination.Explicit.DestinationPort) + } } return b @@ -67,10 +69,10 @@ func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *Bui return b.addListener(listener) } -// for explicit destinations, we have no filter chain match, and filters based on port protocol func (b *Builder) addRouter(clusterName, statPrefix string, protocol pbcatalog.Protocol) *Builder { listener := b.getLastBuiltListener() + // For explicit destinations, we have no filter chain match, and filters are based on port protocol. switch protocol { case pbcatalog.Protocol_PROTOCOL_TCP: router := &pbproxystate.Router{ @@ -123,7 +125,6 @@ func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbreso } func (b *Builder) addEndpointsRef(clusterName string, serviceEndpointsID *pbresource.ID, destinationPort string) *Builder { - // Finally, add endpoints references. b.proxyStateTemplate.RequiredEndpoints[clusterName] = &pbproxystate.EndpointRef{ Id: serviceEndpointsID, Port: destinationPort, diff --git a/internal/mesh/internal/controllers/sidecar-proxy/builder/local_app.go b/internal/mesh/internal/controllers/sidecar-proxy/builder/local_app.go index 5082ce298eed6..323863475b4f6 100644 --- a/internal/mesh/internal/controllers/sidecar-proxy/builder/local_app.go +++ b/internal/mesh/internal/controllers/sidecar-proxy/builder/local_app.go @@ -64,8 +64,10 @@ func (b *Builder) addInboundRouters(workload *pbcatalog.Workload) *Builder { listener := b.getLastBuiltListener() // Go through workload ports and add the first non-mesh port we see. + // Note that the order of ports is non-deterministic here but the xds generation + // code should make sure to send it in the same order to Envoy to avoid unnecessary + // updates. // todo (ishustava): Note we will need to support multiple ports in the future. - // todo (ishustava): make sure we always iterate through ports in the same order so we don't need to send more updates to envoy. for portName, port := range workload.Ports { clusterName := fmt.Sprintf("%s:%s", xdscommon.LocalAppClusterName, portName) if port.Protocol == pbcatalog.Protocol_PROTOCOL_TCP { diff --git a/internal/mesh/internal/controllers/sidecar-proxy/cache/cache.go b/internal/mesh/internal/controllers/sidecar-proxy/cache/cache.go index 5e8dd7d8b12d4..8eee2c10b9231 100644 --- a/internal/mesh/internal/controllers/sidecar-proxy/cache/cache.go +++ b/internal/mesh/internal/controllers/sidecar-proxy/cache/cache.go @@ -9,11 +9,11 @@ import ( "github.com/hashicorp/consul/proto-public/pbresource" ) -// Cache stores information needed for the mesh controller to reconcile efficiently. +// Cache stores information needed for the sidecar-proxy controller to reconcile efficiently. // This currently means storing a list of all destinations for easy look up // as well as indices of source proxies where those destinations are referenced. // -// It is the responsibility of controller and its subcomponents (like mapper and data fetcher) +// It is the responsibility of the controller and its subcomponents (like mapper and data fetcher) // to keep this cache up-to-date as we're observing new data. type Cache struct { lock sync.RWMutex diff --git a/internal/mesh/internal/controllers/sidecar-proxy/controller.go b/internal/mesh/internal/controllers/sidecar-proxy/controller.go index 2f1ea8c943770..ddaff28d5ad28 100644 --- a/internal/mesh/internal/controllers/sidecar-proxy/controller.go +++ b/internal/mesh/internal/controllers/sidecar-proxy/controller.go @@ -49,7 +49,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c // Instantiate a data fetcher to fetch all reconciliation data. dataFetcher := fetcher.Fetcher{Client: rt.Client, Cache: r.cache} - // Check if the apiWorkload exists. + // Check if the workload exists. workloadID := resource.ReplaceType(catalog.WorkloadType, req.ID) workload, err := dataFetcher.FetchWorkload(ctx, resource.ReplaceType(catalog.WorkloadType, req.ID)) if err != nil { @@ -57,7 +57,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return err } if workload == nil { - // If apiWorkload has been deleted, then return as ProxyStateTemplate should be cleaned up + // If workload has been deleted, then return as ProxyStateTemplate should be cleaned up // by the garbage collector because of the owner reference. rt.Logger.Trace("workload doesn't exist; skipping reconciliation", "workload", workloadID) return nil @@ -70,7 +70,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c } if proxyStateTemplate == nil { - // If proxy state template has been deleted + // If proxy state template has been deleted, we will need to generate a new one. rt.Logger.Trace("proxy state template for this workload doesn't yet exist; generating a new one", "id", req.ID) } diff --git a/internal/mesh/internal/controllers/sidecar-proxy/fetcher/data_fetcher.go b/internal/mesh/internal/controllers/sidecar-proxy/fetcher/data_fetcher.go index ac9e8cc207794..98db40439382a 100644 --- a/internal/mesh/internal/controllers/sidecar-proxy/fetcher/data_fetcher.go +++ b/internal/mesh/internal/controllers/sidecar-proxy/fetcher/data_fetcher.go @@ -203,7 +203,7 @@ func (f *Fetcher) FetchDestinationsData( return destinations, statuses, nil } -// IsMeshEnabled returns true if apiWorkload or service endpoints port +// IsMeshEnabled returns true if the workload or service endpoints port // contain a port with the "mesh" protocol. func IsMeshEnabled(ports map[string]*pbcatalog.WorkloadPort) bool { for _, port := range ports { diff --git a/internal/mesh/internal/types/intermediate/types.go b/internal/mesh/internal/types/intermediate/types.go index d7c033f9060d5..4de297822ef18 100644 --- a/internal/mesh/internal/types/intermediate/types.go +++ b/internal/mesh/internal/types/intermediate/types.go @@ -6,15 +6,17 @@ import ( "github.com/hashicorp/consul/proto-public/pbresource" ) -// todo should it be destination? -// the problem is that it's compiled from different source objects +// CombinedDestinationRef contains all references we need for a specific +// destination on the mesh. type CombinedDestinationRef struct { - // ServiceRef is the reference to the destination service for this upstream + // ServiceRef is the reference to the destination service. ServiceRef *pbresource.Reference + // Port is the port name for this destination. Port string - // sourceProxies are the IDs of source proxy state template resources. + // SourceProxies are the IDs of source proxy state template resources. + // The keys are a string representation of *pbresource.ID. SourceProxies map[string]*pbresource.ID // explicitUpstreamID is the id of an explicit upstreams resource. For implicit upstreams, diff --git a/internal/mesh/internal/types/upstreams.go b/internal/mesh/internal/types/upstreams.go index 14364e3e2ae3e..8ccb1554593bc 100644 --- a/internal/mesh/internal/types/upstreams.go +++ b/internal/mesh/internal/types/upstreams.go @@ -10,7 +10,7 @@ import ( ) const ( - UpstreamsKind = "Destinations" + UpstreamsKind = "Upstreams" ) var (