Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NET-5586][rebased] v2: Support virtual port references in config #20371

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
}
],
"ports": {
"external-service-port": {
"ext-svc-port": {
"port": 9876,
"protocol": "PROTOCOL_HTTP2"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"@type": "hashicorp.consul.catalog.v2beta1.Service",
"ports": [
{
"target_port": "external-service-port",
"target_port": "ext-svc-port",
"protocol": "PROTOCOL_HTTP2"
}
]
Expand Down
2 changes: 1 addition & 1 deletion internal/catalog/catalogtest/test_integration_v2beta1.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func expectedFooServiceEndpoints() *pbcatalog.ServiceEndpoints {
{Host: "198.18.0.1"},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"external-service-port": {
"ext-svc-port": {
Port: 9876,
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP2,
},
Expand Down
4 changes: 2 additions & 2 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func ValidateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
return types.ValidateSelector(sel, allowEmpty)
}

func ValidatePortName(name string) error {
return types.ValidatePortName(name)
func ValidateServicePortID(id string) error {
return types.ValidateServicePortID(id)
}

func IsValidUnixSocketPath(host string) bool {
Expand Down
34 changes: 27 additions & 7 deletions internal/catalog/internal/controllers/failover/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package failover
import (
"context"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/hashicorp/consul/internal/catalog/internal/controllers/failover/expander"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
Expand All @@ -16,8 +19,6 @@ import (
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

const (
Expand Down Expand Up @@ -89,6 +90,8 @@ func (r *failoverPolicyReconciler) Reconcile(ctx context.Context, rt controller.

return nil
}
// Capture original raw config for pre-normalization status conditions.
rawFailoverPolicy := failoverPolicy.Data

// FailoverPolicy is name-aligned with the Service it controls.
serviceID := &pbresource.ID{
Expand Down Expand Up @@ -149,13 +152,13 @@ func (r *failoverPolicyReconciler) Reconcile(ctx context.Context, rt controller.
}
}

conds := computeNewConditions(failoverPolicy.Resource, newComputedFailoverPolicy, service, destServices, missingSamenessGroups)
conds := computeNewConditions(rawFailoverPolicy, failoverPolicy.Resource, newComputedFailoverPolicy, service, destServices, missingSamenessGroups)
if err := writeStatus(ctx, rt, failoverPolicy.Resource, conds); err != nil {
rt.Logger.Error("error encountered when attempting to update the resource's failover policy status", "error", err)
return err
}

conds = computeNewConditions(computedFailoverResource, newComputedFailoverPolicy, service, destServices, missingSamenessGroups)
conds = computeNewConditions(rawFailoverPolicy, computedFailoverResource, newComputedFailoverPolicy, service, destServices, missingSamenessGroups)
if err := writeStatus(ctx, rt, computedFailoverResource, conds); err != nil {
rt.Logger.Error("error encountered when attempting to update the resource's computed failover policy status", "error", err)
return err
Expand All @@ -165,6 +168,7 @@ func (r *failoverPolicyReconciler) Reconcile(ctx context.Context, rt controller.
}

func computeNewConditions(
rawFailoverPolicy *pbcatalog.FailoverPolicy,
fpRes *pbresource.Resource,
fp *pbcatalog.ComputedFailoverPolicy,
service *resource.DecodedResource[*pbcatalog.Service],
Expand All @@ -182,11 +186,27 @@ func computeNewConditions(

var conditions []*pbresource.Condition

for port, pc := range fp.GetPortConfigs() {
if _, ok := allowedPortProtocols[port]; !ok {
conditions = append(conditions, ConditionUnknownPort(port))
if rawFailoverPolicy != nil {
// We need to validate port mappings on the raw input config due to the
// possibility of duplicate mappings, which will be normalized into one
// mapping by target port key.
usedTargetPorts := make(map[string]any)
for port := range rawFailoverPolicy.PortConfigs {
svcPort := service.Data.FindPortByID(port)
targetPort := svcPort.GetTargetPort() // svcPort could be nil

serviceRef := resource.NewReferenceKey(service.Id).ToReference()
if svcPort == nil {
conditions = append(conditions, ConditionUnknownPort(serviceRef, port))
} else if _, ok := usedTargetPorts[targetPort]; ok {
conditions = append(conditions, ConditionConflictDestinationPort(serviceRef, svcPort))
} else {
usedTargetPorts[targetPort] = struct{}{}
}
}
}

for _, pc := range fp.GetPortConfigs() {
for _, dest := range pc.Destinations {
// We know from validation that a Ref must be set, and the type it
// points to is a Service.
Expand Down
115 changes: 98 additions & 17 deletions internal/catalog/internal/controllers/failover/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ func TestController(t *testing.T) {
apiServiceData := &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"api-"}},
Ports: []*pbcatalog.ServicePort{{
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
VirtualPort: 8080,
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
}},
}
svc := rtest.Resource(pbcatalog.ServiceType, "api").
Expand Down Expand Up @@ -108,7 +109,60 @@ func TestController(t *testing.T) {

t.Logf("reconciled to accepted")

// Update the failover to reference an unknown port
// Update the failover to reference a port twice (once by virtual, once by target port)
failoverData = &pbcatalog.FailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Destinations: []*pbcatalog.FailoverDestination{{
Ref: apiServiceRef,
Port: "http",
}},
},
"8080": {
Destinations: []*pbcatalog.FailoverDestination{{
Ref: apiServiceRef,
Port: "http",
}},
},
},
}
failover = rtest.Resource(pbcatalog.FailoverPolicyType, "api").
WithData(t, failoverData).
WithTenancy(tenancy).
Write(t, client)

t.Cleanup(func() { client.MustDelete(t, failover.Id) })

// Assert that the FailoverPolicy has the conflict condition.
client.WaitForStatusCondition(t, failover.Id, ControllerID, ConditionConflictDestinationPort(apiServiceRef, &pbcatalog.ServicePort{
VirtualPort: 8080,
TargetPort: "http",
}))

// Assert that the ComputedFailoverPolicy has the conflict condition.
// The port normalization that occurs in the call to SimplifyFailoverPolicy results in the port being
// removed from the final FailoverPolicy and ComputedFailoverPolicy.
expFailoverData := &pbcatalog.FailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Destinations: []*pbcatalog.FailoverDestination{{
Ref: apiServiceRef,
Port: "http",
}},
},
},
}
expectedComputedFP = &pbcatalog.ComputedFailoverPolicy{
PortConfigs: expFailoverData.PortConfigs,
BoundReferences: []*pbresource.Reference{apiServiceRef},
}
waitAndAssertComputedFailoverPolicy(t, client, failover.Id, expectedComputedFP, ConditionConflictDestinationPort(apiServiceRef, &pbcatalog.ServicePort{
VirtualPort: 8080,
TargetPort: "http",
}))
t.Logf("reconciled to using duplicate destination port")

// Update the failover to fix the duplicate, but reference an unknown port
failoverData = &pbcatalog.FailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Expand All @@ -132,27 +186,50 @@ func TestController(t *testing.T) {

t.Cleanup(func() { client.MustDelete(t, failover.Id) })

// Assert that the FailoverPolicy has the unknown condition.
client.WaitForStatusCondition(t, failover.Id, ControllerID, ConditionUnknownPort(apiServiceRef, "admin"))

// Assert that the ComputedFailoverPolicy has the unknown condition.
// The port normalization that occurs in the call to SimplifyFailoverPolicy results in the port being
// removed from the final FailoverPolicy and ComputedFailoverPolicy.
expFailoverData = &pbcatalog.FailoverPolicy{
PortConfigs: map[string]*pbcatalog.FailoverConfig{
"http": {
Destinations: []*pbcatalog.FailoverDestination{{
Ref: apiServiceRef,
Port: "http",
}},
},
},
}
expectedComputedFP = &pbcatalog.ComputedFailoverPolicy{
PortConfigs: failoverData.PortConfigs,
PortConfigs: expFailoverData.PortConfigs,
BoundReferences: []*pbresource.Reference{apiServiceRef},
}
waitAndAssertComputedFailoverPolicy(t, client, failover.Id, expectedComputedFP, ConditionUnknownPort("admin"))
waitAndAssertComputedFailoverPolicy(t, client, failover.Id, expectedComputedFP, ConditionUnknownPort(apiServiceRef, "admin"))
t.Logf("reconciled to unknown admin port")

// update the service to fix the stray reference, but point to a mesh port
apiServiceData = &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"api-"}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
TargetPort: "http",
VirtualPort: 8080,
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
TargetPort: "admin",
VirtualPort: 10000,
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
}
// update the expected ComputedFailoverPolicy to add back in the admin port as well
expectedComputedFP = &pbcatalog.ComputedFailoverPolicy{
PortConfigs: failoverData.PortConfigs,
BoundReferences: []*pbresource.Reference{apiServiceRef},
}
svc = rtest.Resource(pbcatalog.ServiceType, "api").
WithData(t, apiServiceData).
WithTenancy(tenancy).
Expand All @@ -168,12 +245,14 @@ func TestController(t *testing.T) {
Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"api-"}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
VirtualPort: 8080,
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
VirtualPort: 10000,
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
},
}
Expand Down Expand Up @@ -253,12 +332,14 @@ func TestController(t *testing.T) {
Workloads: &pbcatalog.WorkloadSelector{Prefixes: []string{"other-"}},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
VirtualPort: 8080,
TargetPort: "http",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
VirtualPort: 10000,
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
},
}
Expand Down
17 changes: 15 additions & 2 deletions internal/catalog/internal/controllers/failover/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package failover

import (
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)

Expand Down Expand Up @@ -32,6 +33,9 @@ const (

MissingSamenessGroupReason = "MissingSamenessGroup"
MissingSamenessGroupMessagePrefix = "referenced sameness group does not exist: "

ConflictDestinationPortReason = "ConflictDestinationPort"
ConflictDestinationPortMessagePrefix = "multiple configs found for port on destination service: "
)

var (
Expand All @@ -50,12 +54,12 @@ var (
}
)

func ConditionUnknownPort(port string) *pbresource.Condition {
func ConditionUnknownPort(ref *pbresource.Reference, port string) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: UnknownPortReason,
Message: UnknownPortMessagePrefix + port,
Message: UnknownPortMessagePrefix + port + " on " + resource.ReferenceToString(ref),
}
}

Expand Down Expand Up @@ -94,3 +98,12 @@ func ConditionMissingSamenessGroup(ref *pbresource.Reference) *pbresource.Condit
Message: MissingSamenessGroupMessagePrefix + resource.ReferenceToString(ref),
}
}

func ConditionConflictDestinationPort(ref *pbresource.Reference, port *pbcatalog.ServicePort) *pbresource.Condition {
return &pbresource.Condition{
Type: StatusConditionAccepted,
State: pbresource.Condition_STATE_FALSE,
Reason: ConflictDestinationPortReason,
Message: ConflictDestinationPortMessagePrefix + port.ToPrintableString() + " on " + resource.ReferenceToString(ref),
}
}
4 changes: 3 additions & 1 deletion internal/catalog/internal/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
)

var (
errNotDNSLabel = errors.New(fmt.Sprintf("value must match regex: %s", dnsLabelRegex))
errNotDNSLabel = errors.New(fmt.Sprintf("value must be 1-63 characters and match regex: %s", dnsLabelRegex))
errNotPortName = errors.New(fmt.Sprintf("value must be 1-15 characters, contain at least 1 alpha, and match regex: %s", ianaSvcNameRegex))
errNotIPAddress = errors.New("value is not a valid IP address")
errUnixSocketMultiport = errors.New("Unix socket address references more than one port")
errInvalidPhysicalPort = errors.New("port number is outside the range 1 to 65535")
errInvalidVirtualPort = errors.New("port number is outside the range 0 to 65535")
errInvalidPortID = errors.New(fmt.Sprintf("value must be in the range 1 to 65535 or be 1-15 characters, contain at least 1 alpha, and match regex: %s", ianaSvcNameRegex))
errDNSWarningWeightOutOfRange = errors.New("DNS warning weight is outside the range 0 to 65535")
errDNSPassingWeightOutOfRange = errors.New("DNS passing weight is outside of the range 1 to 65535")
errLocalityZoneNoRegion = errors.New("locality region cannot be empty if the zone is set")
Expand Down
Loading
Loading