Skip to content

Commit

Permalink
small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ishustava committed Aug 4, 2023
1 parent 81c0910 commit 2cda1e9
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 41 deletions.
4 changes: 0 additions & 4 deletions internal/catalog/internal/controllers/endpoints/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package endpoints

import (
"context"
"fmt"
"sort"

"github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth"
Expand Down Expand Up @@ -376,9 +375,6 @@ func workloadToEndpoint(svc *pbcatalog.Service, data *workloadData) *pbcatalog.E
return nil
}

if data.resource.Id == nil {
fmt.Println("-------------------iryna: workload id is nil")
}
return &pbcatalog.Endpoint{
TargetRef: data.resource.Id,
HealthStatus: health,
Expand Down
14 changes: 0 additions & 14 deletions internal/catalog/internal/controllers/endpoints/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,20 +702,6 @@ func (suite *controllerSuite) TestController() {
endpoints = suite.client.WaitForNewVersion(suite.T(), endpointsID, endpoints.Version)
rtest.RequireOwner(suite.T(), endpoints, updatedService.Id, false)

// ensure the endpoint was put into the passing state
suite.requireEndpoints(endpoints, &pbcatalog.Endpoint{
TargetRef: workload.Id,
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{"grpc", "http"}},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"http": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
"grpc": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_GRPC},
},
HealthStatus: pbcatalog.Health_HEALTH_PASSING,
Identity: "api",
})

// Delete the endpoints. The controller should bring these back momentarily
suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: endpointsID})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ func (b *Builder) buildExplicitDestination(destination *intermediate.Destination
clusterName := DestinationClusterName(destination.Explicit.DestinationRef, destination.Explicit.Datacenter, b.trustDomain)
statPrefix := DestinationStatPrefix(destination.Explicit.DestinationRef, destination.Explicit.Datacenter)

// We assume that all endpoints have the same port. Later, we will change service endpoints to
// have the global ports map rather than per address.
destPort := destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports[destination.Explicit.DestinationPort]

if destPort != nil {
return b.addOutboundDestinationListener(destination.Explicit).
addRouter(clusterName, statPrefix, destPort.Protocol).
addCluster(clusterName, destination.Identities).
addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, destination.Explicit.DestinationPort)
// All endpoints should have the same protocol as the endpoints controller ensures that is the case,
// so it's sufficient to read just the first endpoint.
if len(destination.ServiceEndpoints.Endpoints.Endpoints) > 0 {
destPort := destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports[destination.Explicit.DestinationPort]

if destPort != nil {
return b.addOutboundDestinationListener(destination.Explicit).
addRouter(clusterName, statPrefix, destPort.Protocol).
addCluster(clusterName, destination.Identities).
addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, destination.Explicit.DestinationPort)
}
}

return b
Expand Down Expand Up @@ -67,10 +69,10 @@ func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *Bui
return b.addListener(listener)
}

// for explicit destinations, we have no filter chain match, and filters based on port protocol
func (b *Builder) addRouter(clusterName, statPrefix string, protocol pbcatalog.Protocol) *Builder {
listener := b.getLastBuiltListener()

// For explicit destinations, we have no filter chain match, and filters are based on port protocol.
switch protocol {
case pbcatalog.Protocol_PROTOCOL_TCP:
router := &pbproxystate.Router{
Expand Down Expand Up @@ -123,7 +125,6 @@ func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbreso
}

func (b *Builder) addEndpointsRef(clusterName string, serviceEndpointsID *pbresource.ID, destinationPort string) *Builder {
// Finally, add endpoints references.
b.proxyStateTemplate.RequiredEndpoints[clusterName] = &pbproxystate.EndpointRef{
Id: serviceEndpointsID,
Port: destinationPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ func (b *Builder) addInboundRouters(workload *pbcatalog.Workload) *Builder {
listener := b.getLastBuiltListener()

// Go through workload ports and add the first non-mesh port we see.
// Note that the order of ports is non-deterministic here but the xds generation
// code should make sure to send it in the same order to Envoy to avoid unnecessary
// updates.
// todo (ishustava): Note we will need to support multiple ports in the future.
// todo (ishustava): make sure we always iterate through ports in the same order so we don't need to send more updates to envoy.
for portName, port := range workload.Ports {
clusterName := fmt.Sprintf("%s:%s", xdscommon.LocalAppClusterName, portName)
if port.Protocol == pbcatalog.Protocol_PROTOCOL_TCP {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)

// Cache stores information needed for the mesh controller to reconcile efficiently.
// Cache stores information needed for the sidecar-proxy controller to reconcile efficiently.
// This currently means storing a list of all destinations for easy look up
// as well as indices of source proxies where those destinations are referenced.
//
// It is the responsibility of controller and its subcomponents (like mapper and data fetcher)
// It is the responsibility of the controller and its subcomponents (like mapper and data fetcher)
// to keep this cache up-to-date as we're observing new data.
type Cache struct {
lock sync.RWMutex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
// Instantiate a data fetcher to fetch all reconciliation data.
dataFetcher := fetcher.Fetcher{Client: rt.Client, Cache: r.cache}

// Check if the apiWorkload exists.
// Check if the workload exists.
workloadID := resource.ReplaceType(catalog.WorkloadType, req.ID)
workload, err := dataFetcher.FetchWorkload(ctx, resource.ReplaceType(catalog.WorkloadType, req.ID))
if err != nil {
rt.Logger.Error("error reading the associated workload", "error", err)
return err
}
if workload == nil {
// If apiWorkload has been deleted, then return as ProxyStateTemplate should be cleaned up
// If workload has been deleted, then return as ProxyStateTemplate should be cleaned up
// by the garbage collector because of the owner reference.
rt.Logger.Trace("workload doesn't exist; skipping reconciliation", "workload", workloadID)
return nil
Expand All @@ -70,7 +70,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
}

if proxyStateTemplate == nil {
// If proxy state template has been deleted
// If proxy state template has been deleted, we will need to generate a new one.
rt.Logger.Trace("proxy state template for this workload doesn't yet exist; generating a new one", "id", req.ID)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (f *Fetcher) FetchDestinationsData(
return destinations, statuses, nil
}

// IsMeshEnabled returns true if apiWorkload or service endpoints port
// IsMeshEnabled returns true if the workload or service endpoints port
// contain a port with the "mesh" protocol.
func IsMeshEnabled(ports map[string]*pbcatalog.WorkloadPort) bool {
for _, port := range ports {
Expand Down
10 changes: 6 additions & 4 deletions internal/mesh/internal/types/intermediate/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)

// todo should it be destination?
// the problem is that it's compiled from different source objects
// CombinedDestinationRef contains all references we need for a specific
// destination on the mesh.
type CombinedDestinationRef struct {
// ServiceRef is the reference to the destination service for this upstream
// ServiceRef is the reference to the destination service.
ServiceRef *pbresource.Reference

// Port is the port name for this destination.
Port string

// sourceProxies are the IDs of source proxy state template resources.
// SourceProxies are the IDs of source proxy state template resources.
// The keys are a string representation of *pbresource.ID.
SourceProxies map[string]*pbresource.ID

// explicitUpstreamID is the id of an explicit upstreams resource. For implicit upstreams,
Expand Down
2 changes: 1 addition & 1 deletion internal/mesh/internal/types/upstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
UpstreamsKind = "Destinations"
UpstreamsKind = "Upstreams"
)

var (
Expand Down

0 comments on commit 2cda1e9

Please sign in to comment.