diff --git a/test-integ/catalogv2/explicit_destinations_test.go b/test-integ/catalogv2/explicit_destinations_test.go index a62d98b7c605..9c86e0595261 100644 --- a/test-integ/catalogv2/explicit_destinations_test.go +++ b/test-integ/catalogv2/explicit_destinations_test.go @@ -5,7 +5,6 @@ package catalogv2 import ( "fmt" - "strings" "testing" pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1" @@ -18,7 +17,7 @@ import ( "github.com/hashicorp/consul/test-integ/topoutil" ) -// TestBasicL4ExplicitDestination sets up the following: +// TestBasicL4ExplicitDestinations sets up the following: // // - 1 cluster (no peering / no wanfed) // - 3 servers in that cluster @@ -37,8 +36,8 @@ import ( // - part1/default // - default/nsa // - part1/nsa -func TestBasicL4ExplicitDestination(t *testing.T) { - cfg := testBasicL4ExplicitDestinationCreator{}.NewConfig(t) +func TestBasicL4ExplicitDestinations(t *testing.T) { + cfg := testBasicL4ExplicitDestinationsCreator{}.NewConfig(t) sp := sprawltest.Launch(t, cfg) @@ -69,20 +68,11 @@ func TestBasicL4ExplicitDestination(t *testing.T) { for _, ship := range ships { t.Run("relationship: "+ship.String(), func(t *testing.T) { var ( - svc = ship.Caller - u = ship.Upstream - clusterPrefix string + svc = ship.Caller + u = ship.Upstream ) - if u.Peer == "" { - if u.ID.PartitionOrDefault() == "default" { - clusterPrefix = strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".") - } else { - clusterPrefix = strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".") - } - } else { - clusterPrefix = strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".") - } + clusterPrefix := clusterPrefixForUpstream(u) asserter.UpstreamEndpointStatus(t, svc, clusterPrefix+".", "HEALTHY", 1) asserter.HTTPServiceEchoes(t, svc, u.LocalPort, "") @@ -91,9 +81,9 @@ func TestBasicL4ExplicitDestination(t *testing.T) { } } -type testBasicL4ExplicitDestinationCreator struct{} +type testBasicL4ExplicitDestinationsCreator struct{} -func (c testBasicL4ExplicitDestinationCreator) NewConfig(t *testing.T) *topology.Config { +func (c testBasicL4ExplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config { const clusterName = "dc1" servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil) @@ -129,7 +119,7 @@ func (c testBasicL4ExplicitDestinationCreator) NewConfig(t *testing.T) *topology } } -func (c testBasicL4ExplicitDestinationCreator) topologyConfigAddNodes( +func (c testBasicL4ExplicitDestinationsCreator) topologyConfigAddNodes( t *testing.T, cluster *topology.Cluster, nodeName func() string, diff --git a/test-integ/catalogv2/helpers_test.go b/test-integ/catalogv2/helpers_test.go new file mode 100644 index 000000000000..f5e352779a29 --- /dev/null +++ b/test-integ/catalogv2/helpers_test.go @@ -0,0 +1,22 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package catalogv2 + +import ( + "strings" + + "github.com/hashicorp/consul/testing/deployer/topology" +) + +func clusterPrefixForUpstream(u *topology.Upstream) string { + if u.Peer == "" { + if u.ID.PartitionOrDefault() == "default" { + return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.Cluster, "internal"}, ".") + } else { + return strings.Join([]string{u.PortName, u.ID.Name, u.ID.Namespace, u.ID.Partition, u.Cluster, "internal-v1"}, ".") + } + } else { + return strings.Join([]string{u.ID.Name, u.ID.Namespace, u.Peer, "external"}, ".") + } +} diff --git a/test-integ/catalogv2/implicit_destinations_test.go b/test-integ/catalogv2/implicit_destinations_test.go new file mode 100644 index 000000000000..daf19945a9e3 --- /dev/null +++ b/test-integ/catalogv2/implicit_destinations_test.go @@ -0,0 +1,214 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package catalogv2 + +import ( + "fmt" + "testing" + + pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1" + "github.com/hashicorp/consul/proto-public/pbresource" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest" + "github.com/hashicorp/consul/testing/deployer/topology" + + "github.com/hashicorp/consul/test-integ/topoutil" +) + +// TestBasicL4ImplicitDestinations sets up the following: +// +// - 1 cluster (no peering / no wanfed) +// - 3 servers in that cluster +// - v2 arch is activated +// - for each tenancy, only using v2 constructs: +// - a server exposing 2 tcp ports +// - a client with transparent proxy enabled and no explicit upstreams +// - a traffic permission granting the client access to the service on all ports +// +// When this test is executed in CE it will only use the default/default +// tenancy. +// +// When this test is executed in Enterprise it will additionally test the same +// things within these tenancies: +// +// - part1/default +// - default/nsa +// - part1/nsa +func TestBasicL4ImplicitDestinations(t *testing.T) { + cfg := testBasicL4ImplicitDestinationsCreator{}.NewConfig(t) + + sp := sprawltest.Launch(t, cfg) + + var ( + asserter = topoutil.NewAsserter(sp) + + topo = sp.Topology() + cluster = topo.Clusters["dc1"] + + ships = topo.ComputeRelationships() + ) + + clientV2 := sp.ResourceServiceClientForCluster(cluster.Name) + + t.Log(topology.RenderRelationships(ships)) + + // Make sure things are truly in v2 not v1. + for _, name := range []string{ + "static-server", + "static-client", + } { + libassert.CatalogV2ServiceHasEndpointCount(t, clientV2, name, nil, 1) + } + + // Check relationships + for _, ship := range ships { + t.Run("relationship: "+ship.String(), func(t *testing.T) { + var ( + svc = ship.Caller + u = ship.Upstream + ) + + clusterPrefix := clusterPrefixForUpstream(u) + + asserter.UpstreamEndpointStatus(t, svc, clusterPrefix+".", "HEALTHY", 1) + if u.LocalPort > 0 { + asserter.HTTPServiceEchoes(t, svc, u.LocalPort, "") + } + asserter.FortioFetch2FortioName(t, svc, u, cluster.Name, u.ID) + }) + } +} + +type testBasicL4ImplicitDestinationsCreator struct{} + +func (c testBasicL4ImplicitDestinationsCreator) NewConfig(t *testing.T) *topology.Config { + const clusterName = "dc1" + + servers := topoutil.NewTopologyServerSet(clusterName+"-server", 3, []string{clusterName, "wan"}, nil) + + cluster := &topology.Cluster{ + Enterprise: utils.IsEnterprise(), + Name: clusterName, + Nodes: servers, + } + + lastNode := 0 + nodeName := func() string { + lastNode++ + return fmt.Sprintf("%s-box%d", clusterName, lastNode) + } + + c.topologyConfigAddNodes(t, cluster, nodeName, "default", "default") + if cluster.Enterprise { + c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "default") + c.topologyConfigAddNodes(t, cluster, nodeName, "part1", "nsa") + c.topologyConfigAddNodes(t, cluster, nodeName, "default", "nsa") + } + + return &topology.Config{ + Images: topoutil.TargetImages(), + Networks: []*topology.Network{ + {Name: clusterName}, + {Name: "wan", Type: "wan"}, + }, + Clusters: []*topology.Cluster{ + cluster, + }, + } +} + +func (c testBasicL4ImplicitDestinationsCreator) topologyConfigAddNodes( + t *testing.T, + cluster *topology.Cluster, + nodeName func() string, + partition, + namespace string, +) { + clusterName := cluster.Name + + newServiceID := func(name string) topology.ServiceID { + return topology.ServiceID{ + Partition: partition, + Namespace: namespace, + Name: name, + } + } + + tenancy := &pbresource.Tenancy{ + Partition: partition, + Namespace: namespace, + PeerName: "local", + } + + serverNode := &topology.Node{ + Kind: topology.NodeKindDataplane, + Version: topology.NodeVersionV2, + Partition: partition, + Name: nodeName(), + Services: []*topology.Service{ + topoutil.NewFortioServiceWithDefaults( + clusterName, + newServiceID("static-server"), + topology.NodeVersionV2, + func(svc *topology.Service) { + svc.EnableTransparentProxy = true + }, + ), + }, + } + clientNode := &topology.Node{ + Kind: topology.NodeKindDataplane, + Version: topology.NodeVersionV2, + Partition: partition, + Name: nodeName(), + Services: []*topology.Service{ + topoutil.NewFortioServiceWithDefaults( + clusterName, + newServiceID("static-client"), + topology.NodeVersionV2, + func(svc *topology.Service) { + svc.EnableTransparentProxy = true + svc.ImpliedUpstreams = []*topology.Upstream{ + { + ID: newServiceID("static-server"), + PortName: "http", + }, + { + ID: newServiceID("static-server"), + PortName: "http-alt", + }, + } + }, + ), + }, + } + trafficPerms := sprawltest.MustSetResourceData(t, &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbauth.TrafficPermissionsType, + Name: "static-server-perms", + Tenancy: tenancy, + }, + }, &pbauth.TrafficPermissions{ + Destination: &pbauth.Destination{ + IdentityName: "static-server", + }, + Action: pbauth.Action_ACTION_ALLOW, + Permissions: []*pbauth.Permission{{ + Sources: []*pbauth.Source{{ + IdentityName: "static-client", + Namespace: namespace, + }}, + }}, + }) + + cluster.Nodes = append(cluster.Nodes, + clientNode, + serverNode, + ) + + cluster.InitialResources = append(cluster.InitialResources, + trafficPerms, + ) +} diff --git a/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go b/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go index 586103c11127..b8c7b83f2a2c 100644 --- a/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go +++ b/test-integ/peering_commontopo/ac3_service_defaults_upstream_test.go @@ -11,14 +11,13 @@ import ( "testing" "time" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil/retry" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" "github.com/hashicorp/consul/testing/deployer/topology" "github.com/hashicorp/go-cleanhttp" "github.com/itchyny/gojq" "github.com/stretchr/testify/require" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/testutil/retry" - libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" ) var ac3SvcDefaultsSuites []sharedTopoSuite = []sharedTopoSuite{ @@ -185,7 +184,7 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) { // TODO: what is default? namespace? partition? clusterName := fmt.Sprintf("%s.default.%s.external", s.upstream.ID.Name, s.upstream.Peer) nonceStatus := http.StatusInsufficientStorage - url507 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort, + url507 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/?status=%d", s.upstream.LocalPort, nonceStatus)), ) @@ -221,7 +220,7 @@ func (s *ac3SvcDefaultsSuite) test(t *testing.T, ct *commonTopo) { require.True(r, resultAsBool) }) - url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort, + url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", svcClient.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/", s.upstream.LocalPort)), ) retry.RunWith(&retry.Timer{Timeout: time.Minute * 1, Wait: time.Millisecond * 500}, t, func(r *retry.R) { diff --git a/test-integ/peering_commontopo/ac4_proxy_defaults_test.go b/test-integ/peering_commontopo/ac4_proxy_defaults_test.go index c413820c6f2b..a19782bbabc9 100644 --- a/test-integ/peering_commontopo/ac4_proxy_defaults_test.go +++ b/test-integ/peering_commontopo/ac4_proxy_defaults_test.go @@ -180,11 +180,11 @@ func (s *ac4ProxyDefaultsSuite) test(t *testing.T, ct *commonTopo) { }) t.Run("HTTP service fails due to connection timeout", func(t *testing.T) { - url504 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort, + url504 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/?delay=1000ms", s.upstream.LocalPort)), ) - url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort, + url200 := fmt.Sprintf("http://localhost:%d/fortio/fetch2?url=%s", client.ExposedPort(""), url.QueryEscape(fmt.Sprintf("http://localhost:%d/", s.upstream.LocalPort)), ) diff --git a/test-integ/topoutil/asserter.go b/test-integ/topoutil/asserter.go index 3795e1879498..dcf9da52d0a2 100644 --- a/test-integ/topoutil/asserter.go +++ b/test-integ/topoutil/asserter.go @@ -234,10 +234,21 @@ func (a *Asserter) fortioFetch2Upstream( ) (body []byte, res *http.Response) { t.Helper() - // TODO: fortioSvc.ID.Normalize()? or should that be up to the caller? + var actualURL string + if upstream.Implied { + actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s", + upstream.ID.Name, + upstream.ID.Namespace, + upstream.ID.Partition, + upstream.VirtualPort, + path, + ) + } else { + actualURL = fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path) + } url := fmt.Sprintf("http://%s/fortio/fetch2?url=%s", addr, - url.QueryEscape(fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path)), + url.QueryEscape(actualURL), ) req, err := http.NewRequest(http.MethodPost, url, nil) @@ -246,6 +257,7 @@ func (a *Asserter) fortioFetch2Upstream( res, err = client.Do(req) require.NoError(t, err) defer res.Body.Close() + // not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode) require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode) @@ -281,7 +293,13 @@ func (a *Asserter) FortioFetch2HeaderEcho(t *testing.T, fortioSvc *topology.Serv // similar to libassert.AssertFortioName, // uses the /fortio/fetch2 endpoint to hit the debug endpoint on the upstream, // and assert that the FORTIO_NAME == name -func (a *Asserter) FortioFetch2FortioName(t *testing.T, fortioSvc *topology.Service, upstream *topology.Upstream, clusterName string, sid topology.ServiceID) { +func (a *Asserter) FortioFetch2FortioName( + t *testing.T, + fortioSvc *topology.Service, + upstream *topology.Upstream, + clusterName string, + sid topology.ServiceID, +) { t.Helper() var ( @@ -295,6 +313,7 @@ func (a *Asserter) FortioFetch2FortioName(t *testing.T, fortioSvc *topology.Serv retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) { body, res := a.fortioFetch2Upstream(r, client, addr, upstream, path) + require.Equal(r, http.StatusOK, res.StatusCode) // TODO: not sure we should retry these? diff --git a/test-integ/topoutil/fixtures.go b/test-integ/topoutil/fixtures.go index 18742dd2c0b1..c8e9afbe10b0 100644 --- a/test-integ/topoutil/fixtures.go +++ b/test-integ/topoutil/fixtures.go @@ -41,10 +41,14 @@ func NewFortioServiceWithDefaults( } if nodeVersion == topology.NodeVersionV2 { - svc.Ports = map[string]int{ - "http": httpPort, - "http-alt": httpPort, - "grpc": grpcPort, + svc.Ports = map[string]*topology.Port{ + // TODO(rb/v2): once L7 works in v2 switch these back + "http": {Number: httpPort, Protocol: "tcp"}, + "http-alt": {Number: httpPort, Protocol: "tcp"}, + "grpc": {Number: grpcPort, Protocol: "tcp"}, + // "http": {Number: httpPort, Protocol: "http"}, + // "http-alt": {Number: httpPort, Protocol: "http"}, + // "grpc": {Number: grpcPort, Protocol: "grpc"}, } } else { svc.Port = httpPort diff --git a/test/integration/consul-container/libs/assert/envoy.go b/test/integration/consul-container/libs/assert/envoy.go index 35f9873741e2..076f2e1af62b 100644 --- a/test/integration/consul-container/libs/assert/envoy.go +++ b/test/integration/consul-container/libs/assert/envoy.go @@ -118,7 +118,7 @@ func AssertUpstreamEndpointStatusWithClient( clusterName, healthStatus) results, err := utils.JQFilter(clusters, filter) require.NoErrorf(r, err, "could not find cluster name %q: %v \n%s", clusterName, err, clusters) - require.Len(r, results, 1) // the final part of the pipeline is "length" which only ever returns 1 result + require.Len(r, results, 1, "clusters: "+clusters) // the final part of the pipeline is "length" which only ever returns 1 result result, err := strconv.Atoi(results[0]) assert.NoError(r, err) diff --git a/test/integration/consul-container/libs/assert/service.go b/test/integration/consul-container/libs/assert/service.go index 72e8ef06e705..7434a1d5e36f 100644 --- a/test/integration/consul-container/libs/assert/service.go +++ b/test/integration/consul-container/libs/assert/service.go @@ -63,7 +63,6 @@ func CatalogV2ServiceDoesNotExist(t *testing.T, client pbresource.ResourceServic // number of workload endpoints. func CatalogV2ServiceHasEndpointCount(t *testing.T, client pbresource.ResourceServiceClient, svc string, tenancy *pbresource.Tenancy, count int) { t.Helper() - require.False(t, count == 0) ctx := testutil.TestContext(t) retry.Run(t, func(r *retry.R) { diff --git a/testing/deployer/sprawl/catalog.go b/testing/deployer/sprawl/catalog.go index 75a4fb6fd677..b19626ba7df2 100644 --- a/testing/deployer/sprawl/catalog.go +++ b/testing/deployer/sprawl/catalog.go @@ -196,8 +196,9 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster if node.IsV2() { pending := serviceInstanceToResources(node, svc) - if _, ok := identityInfo[svc.ID]; !ok { - identityInfo[svc.ID] = pending.WorkloadIdentity + workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition) + if _, ok := identityInfo[workloadID]; !ok { + identityInfo[workloadID] = pending.WorkloadIdentity } // Write workload @@ -230,6 +231,15 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster return err } } + if pending.ProxyConfiguration != nil { + res, err := pending.ProxyConfiguration.Build() + if err != nil { + return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err) + } + if _, err := s.writeResource(cluster, res); err != nil { + return err + } + } } else { if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil { return fmt.Errorf("error registering service: %w", err) @@ -268,6 +278,7 @@ func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster }, Data: svcData, } + res, err := svcInfo.Build() if err != nil { return fmt.Errorf("error serializing resource %s: %w", util.IDToString(svcInfo.Resource.Id), err) @@ -482,10 +493,11 @@ func (r *Resource[V]) Build() (*pbresource.Resource, error) { } type ServiceResources struct { - Workload *Resource[*pbcatalog.Workload] - HealthStatuses []*Resource[*pbcatalog.HealthStatus] - Destinations *Resource[*pbmesh.Destinations] - WorkloadIdentity *Resource[*pbauth.WorkloadIdentity] + Workload *Resource[*pbcatalog.Workload] + HealthStatuses []*Resource[*pbcatalog.HealthStatus] + Destinations *Resource[*pbmesh.Destinations] + WorkloadIdentity *Resource[*pbauth.WorkloadIdentity] + ProxyConfiguration *Resource[*pbmesh.ProxyConfiguration] } func serviceInstanceToResources( @@ -506,8 +518,8 @@ func serviceInstanceToResources( ) for name, port := range svc.Ports { wlPorts[name] = &pbcatalog.WorkloadPort{ - Port: uint32(port), - Protocol: pbcatalog.Protocol_PROTOCOL_TCP, + Port: uint32(port.Number), + Protocol: port.ActualProtocol, } } @@ -534,21 +546,20 @@ func serviceInstanceToResources( }, }, } - - worloadIdentityRes = &Resource[*pbauth.WorkloadIdentity]{ + workloadIdentityRes = &Resource[*pbauth.WorkloadIdentity]{ Resource: &pbresource.Resource{ Id: &pbresource.ID{ Type: pbauth.WorkloadIdentityType, - Name: svc.ID.Name, + Name: svc.WorkloadIdentity, Tenancy: tenancy, }, - Metadata: svc.Meta, }, Data: &pbauth.WorkloadIdentity{}, } healthResList []*Resource[*pbcatalog.HealthStatus] destinationsRes *Resource[*pbmesh.Destinations] + proxyConfigRes *Resource[*pbmesh.ProxyConfiguration] ) if svc.HasCheck() { @@ -577,11 +588,6 @@ func serviceInstanceToResources( } if !svc.DisableServiceMesh { - workloadRes.Data.Ports["mesh"] = &pbcatalog.WorkloadPort{ - Port: uint32(svc.EnvoyPublicListenerPort), - Protocol: pbcatalog.Protocol_PROTOCOL_MESH, - } - destinationsRes = &Resource[*pbmesh.Destinations]{ Resource: &pbresource.Resource{ Id: &pbresource.ID{ @@ -615,13 +621,32 @@ func serviceInstanceToResources( } destinationsRes.Data.Destinations = append(destinationsRes.Data.Destinations, dest) } + + if svc.EnableTransparentProxy { + proxyConfigRes = &Resource[*pbmesh.ProxyConfiguration]{ + Resource: &pbresource.Resource{ + Id: &pbresource.ID{ + Type: pbmesh.ProxyConfigurationType, + Name: svc.Workload, + Tenancy: tenancy, + }, + }, + Data: &pbmesh.ProxyConfiguration{ + Workloads: selector, + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + }, + }, + } + } } return &ServiceResources{ - Workload: workloadRes, - HealthStatuses: healthResList, - Destinations: destinationsRes, - WorkloadIdentity: worloadIdentityRes, + Workload: workloadRes, + HealthStatuses: healthResList, + Destinations: destinationsRes, + WorkloadIdentity: workloadIdentityRes, + ProxyConfiguration: proxyConfigRes, } } diff --git a/testing/deployer/sprawl/details.go b/testing/deployer/sprawl/details.go index b12fa02e5a45..a0c6c0c2a51e 100644 --- a/testing/deployer/sprawl/details.go +++ b/testing/deployer/sprawl/details.go @@ -72,7 +72,7 @@ func (s *Sprawl) PrintDetails() error { } else { ports := make(map[string]int) for name, port := range svc.Ports { - ports[name] = node.ExposedPort(port) + ports[name] = node.ExposedPort(port.Number) } cd.Apps = append(cd.Apps, appDetail{ Type: "app", diff --git a/testing/deployer/sprawl/internal/build/docker.go b/testing/deployer/sprawl/internal/build/docker.go index ac1976dad4ee..b8ca695f9b0a 100644 --- a/testing/deployer/sprawl/internal/build/docker.go +++ b/testing/deployer/sprawl/internal/build/docker.go @@ -35,6 +35,64 @@ USER 100:0 ENTRYPOINT [] ` +const dockerfileDataplaneForTProxy = ` +ARG DATAPLANE_IMAGE +ARG CONSUL_IMAGE +FROM ${CONSUL_IMAGE} AS consul +FROM ${DATAPLANE_IMAGE} AS distroless +FROM debian:bullseye-slim + +# undo the distroless aspect +COPY --from=distroless /usr/local/bin/discover /usr/local/bin/ +COPY --from=distroless /usr/local/bin/envoy /usr/local/bin/ +COPY --from=distroless /usr/local/bin/consul-dataplane /usr/local/bin/ +COPY --from=distroless /licenses/copyright.txt /licenses/ + +COPY --from=consul /bin/consul /bin/ + +# Install iptables and sudo, needed for tproxy. +RUN apt update -y \ + && apt install -y iptables sudo curl dnsutils + +RUN sed '/_apt/d' /etc/passwd > /etc/passwd.new \ + && mv -f /etc/passwd.new /etc/passwd \ + && adduser --uid=100 consul --no-create-home --disabled-password --system \ + && adduser consul sudo \ + && echo 'consul ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers + +COPY <<'EOF' /bin/tproxy-startup.sh +#!/bin/sh + +set -ex + +# HACK: UID of consul in the consul-client container +# This is conveniently also the UID of apt in the envoy container +CONSUL_UID=100 +ENVOY_UID=$(id -u) + +# - We allow 19000 so that the test can directly visit the envoy admin page. +# - We allow 20000 so that envoy can receive mTLS traffic from other nodes. +# - We (reluctantly) allow 8080 so that we can bypass envoy and talk to fortio +# to do test assertions. +sudo consul connect redirect-traffic \ + -proxy-uid $ENVOY_UID \ + -exclude-uid $CONSUL_UID \ + -proxy-inbound-port=15001 \ + -exclude-inbound-port=19000 \ + -exclude-inbound-port=20000 \ + -exclude-inbound-port=8080 +exec "$@" +EOF + +RUN chmod +x /bin/tproxy-startup.sh \ + && chown 100:0 /bin/tproxy-startup.sh + +RUN echo 'consul ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers + +USER 100:0 +ENTRYPOINT [] +` + func DockerImages( logger hclog.Logger, run *runner.Runner, @@ -80,6 +138,25 @@ func DockerImages( built[cdp] = struct{}{} } + + cdpTproxy := n.Images.LocalDataplaneTProxyImage() + if _, ok := built[cdpTproxy]; cdpTproxy != "" && !ok { + logger.Info("building image", "image", cdpTproxy) + err := run.DockerExec(context.TODO(), []string{ + "build", + "--build-arg", + "DATAPLANE_IMAGE=" + n.Images.Dataplane, + "--build-arg", + "CONSUL_IMAGE=" + n.Images.Consul, + "-t", cdpTproxy, + "-", + }, logw, strings.NewReader(dockerfileDataplaneForTProxy)) + if err != nil { + return err + } + + built[cdpTproxy] = struct{}{} + } } } diff --git a/testing/deployer/sprawl/internal/tfgen/dns.go b/testing/deployer/sprawl/internal/tfgen/dns.go index 20dca878ebf3..9b03693c8311 100644 --- a/testing/deployer/sprawl/internal/tfgen/dns.go +++ b/testing/deployer/sprawl/internal/tfgen/dns.go @@ -8,8 +8,11 @@ import ( "fmt" "os" "path/filepath" + "sort" "strings" + "golang.org/x/exp/maps" + "github.com/hashicorp/consul/testing/deployer/topology" "github.com/hashicorp/consul/testing/deployer/util" ) @@ -63,17 +66,36 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string } } + // Until Consul DNS understands v2, simulate it. + // + // NOTE: this DNS is not quite what consul normally does. It's simpler + // to simulate this format here. + virtualNames := make(map[string][]string) + for id, svcData := range cluster.Services { + if len(svcData.VirtualIps) == 0 { + continue + } + vips := svcData.VirtualIps + + // ----.virtual. + name := fmt.Sprintf("%s--%s--%s", id.Name, id.Namespace, id.Partition) + virtualNames[name] = vips + } + var ( clusterDNSName = cluster.Name + "-consulcluster.lan" - ) + virtualDNSName = "virtual.consul" - corefilePath := filepath.Join(rootdir, "Corefile") - zonefilePath := filepath.Join(rootdir, "servers") + corefilePath = filepath.Join(rootdir, "Corefile") + zonefilePath = filepath.Join(rootdir, "servers") + virtualZonefilePath = filepath.Join(rootdir, "virtual") + ) _, err := UpdateFileIfDifferent( g.logger, generateCoreDNSConfigFile( clusterDNSName, + virtualDNSName, addrs, ), corefilePath, @@ -105,7 +127,25 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string return false, nil, fmt.Errorf("error hashing %q: %w", zonefilePath, err) } - return true, []string{corefileHash, zonefileHash}, nil + _, err = UpdateFileIfDifferent( + g.logger, + generateCoreDNSVirtualZoneFile( + dnsIPAddress, + virtualDNSName, + virtualNames, + ), + virtualZonefilePath, + 0644, + ) + if err != nil { + return false, nil, fmt.Errorf("error writing %q: %w", virtualZonefilePath, err) + } + virtualZonefileHash, err := util.HashFile(virtualZonefilePath) + if err != nil { + return false, nil, fmt.Errorf("error hashing %q: %w", virtualZonefilePath, err) + } + + return true, []string{corefileHash, zonefileHash, virtualZonefileHash}, nil } return false, nil, nil @@ -113,6 +153,7 @@ func (g *Generator) writeCoreDNSFiles(net *topology.Network, dnsIPAddress string func generateCoreDNSConfigFile( clusterDNSName string, + virtualDNSName string, addrs []string, ) []byte { serverPart := "" @@ -139,7 +180,14 @@ consul:53 { whoami } -%[2]s +%[2]s:53 { + file /config/virtual %[2]s + log + errors + whoami +} + +%[3]s .:53 { forward . 8.8.8.8:53 @@ -147,7 +195,7 @@ consul:53 { errors whoami } -`, clusterDNSName, serverPart)) +`, clusterDNSName, virtualDNSName, serverPart)) } func generateCoreDNSZoneFile( @@ -178,3 +226,38 @@ server IN A %s ; Consul server return buf.Bytes() } + +func generateCoreDNSVirtualZoneFile( + dnsIPAddress string, + virtualDNSName string, + nameToAddr map[string][]string, +) []byte { + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf(` +$TTL 60 +$ORIGIN %[1]s. +@ IN SOA ns.%[1]s. webmaster.%[1]s. ( + 2017042745 ; serial + 7200 ; refresh (2 hours) + 3600 ; retry (1 hour) + 1209600 ; expire (2 weeks) + 3600 ; minimum (1 hour) + ) +@ IN NS ns.%[1]s. ; Name server +ns IN A %[2]s ; self +`, virtualDNSName, dnsIPAddress)) + + names := maps.Keys(nameToAddr) + sort.Strings(names) + + for _, name := range names { + vips := nameToAddr[name] + for _, vip := range vips { + buf.WriteString(fmt.Sprintf(` +%s IN A %s ; Consul server +`, name, vip)) + } + } + + return buf.Bytes() +} diff --git a/testing/deployer/sprawl/internal/tfgen/gen.go b/testing/deployer/sprawl/internal/tfgen/gen.go index f64d7b6a254b..0f38714b0c99 100644 --- a/testing/deployer/sprawl/internal/tfgen/gen.go +++ b/testing/deployer/sprawl/internal/tfgen/gen.go @@ -122,8 +122,10 @@ func (s Step) String() string { } } -func (s Step) StartServers() bool { return s >= StepServers } -func (s Step) StartAgents() bool { return s >= StepAgents } +func (s Step) StartServers() bool { return s >= StepServers } + +func (s Step) StartAgents() bool { return s >= StepAgents } + func (s Step) StartServices() bool { return s >= StepServices } // func (s Step) InitiatePeering() bool { return s >= StepPeering } @@ -260,6 +262,7 @@ func (g *Generator) Generate(step Step) error { addImage("", node.Images.Consul) addImage("", node.Images.EnvoyConsulImage()) addImage("", node.Images.LocalDataplaneImage()) + addImage("", node.Images.LocalDataplaneTProxyImage()) if node.IsAgent() { addVolume(node.DockerName()) diff --git a/testing/deployer/sprawl/internal/tfgen/nodes.go b/testing/deployer/sprawl/internal/tfgen/nodes.go index 6f105b6f5ab7..4934482d7240 100644 --- a/testing/deployer/sprawl/internal/tfgen/nodes.go +++ b/testing/deployer/sprawl/internal/tfgen/nodes.go @@ -125,7 +125,11 @@ func (g *Generator) generateNodeContainers( var img string if node.IsDataplane() { tmpl = tfAppDataplaneT - img = DockerImageResourceName(node.Images.LocalDataplaneImage()) + if svc.EnableTransparentProxy { + img = DockerImageResourceName(node.Images.LocalDataplaneTProxyImage()) + } else { + img = DockerImageResourceName(node.Images.LocalDataplaneImage()) + } } else { img = DockerImageResourceName(node.Images.EnvoyConsulImage()) } diff --git a/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl b/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl index fa44090c800a..f706b6ad2d77 100644 --- a/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl +++ b/testing/deployer/sprawl/internal/tfgen/templates/container-app-dataplane.tf.tmpl @@ -17,6 +17,13 @@ resource "docker_container" "{{.Node.DockerName}}-{{.Service.ID.TFString}}-sidec read_only = true } +{{ if .Service.EnableTransparentProxy }} + capabilities { + add = ["NET_ADMIN"] + } + entrypoint = [ "/bin/tproxy-startup.sh" ] +{{ end }} + env = [ "DP_CONSUL_ADDRESSES=server.{{.Node.Cluster}}-consulcluster.lan", {{ if .Node.IsV2 }} @@ -39,6 +46,10 @@ resource "docker_container" "{{.Node.DockerName}}-{{.Service.ID.TFString}}-sidec "DP_CREDENTIAL_STATIC_TOKEN={{.Token}}", {{ end }} +{{ if .Service.EnableTransparentProxy }} + "REDIRECT_TRAFFIC_ARGS=-exclude-inbound-port=19000", +{{ end }} + // for demo purposes "DP_ENVOY_ADMIN_BIND_ADDRESS=0.0.0.0", "DP_ENVOY_ADMIN_BIND_PORT=19000", diff --git a/testing/deployer/topology/compile.go b/testing/deployer/topology/compile.go index c9249d43ed41..beaace3e145b 100644 --- a/testing/deployer/topology/compile.go +++ b/testing/deployer/topology/compile.go @@ -317,6 +317,13 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error return nil, fmt.Errorf("cluster %q node %q has more than one public address", c.Name, n.Name) } + if n.IsDataplane() && len(n.Services) > 1 { + // Our use of consul-dataplane here is supposed to mimic that + // of consul-k8s, which ultimately has one IP per Service, so + // we introduce the same limitation here. + return nil, fmt.Errorf("cluster %q node %q uses dataplane, but has more than one service", c.Name, n.Name) + } + seenServices := make(map[ServiceID]struct{}) for _, svc := range n.Services { if n.IsAgent() { @@ -387,7 +394,7 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error // return nil, fmt.Errorf("service has invalid protocol: %s", svc.Protocol) // } - for _, u := range svc.Upstreams { + defaultUpstream := func(u *Upstream) error { // Default to that of the enclosing service. if u.Peer == "" { if u.ID.Partition == "" { @@ -406,17 +413,43 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error addTenancy(u.ID.Partition, u.ID.Namespace) - if u.LocalAddress == "" { - // v1 defaults to 127.0.0.1 but v2 does not. Safe to do this generally though. - u.LocalAddress = "127.0.0.1" + if u.Implied { + if u.PortName == "" { + return fmt.Errorf("implicit upstreams must use port names in v2") + } + } else { + if u.LocalAddress == "" { + // v1 defaults to 127.0.0.1 but v2 does not. Safe to do this generally though. + u.LocalAddress = "127.0.0.1" + } + if u.PortName != "" && n.IsV1() { + return fmt.Errorf("explicit upstreams cannot use port names in v1") + } + if u.PortName == "" && n.IsV2() { + // Assume this is a v1->v2 conversion and name it. + u.PortName = "legacy" + } } - if u.PortName != "" && n.IsV1() { - return nil, fmt.Errorf("explicit upstreams cannot use port names in v1") + return nil + } + + for _, u := range svc.Upstreams { + if err := defaultUpstream(u); err != nil { + return nil, err } - if u.PortName == "" && n.IsV2() { - // Assume this is a v1->v2 conversion and name it. - u.PortName = "legacy" + } + + if n.IsV2() { + for _, u := range svc.ImpliedUpstreams { + u.Implied = true + if err := defaultUpstream(u); err != nil { + return nil, err + } + } + } else { + if len(svc.ImpliedUpstreams) > 0 { + return nil, fmt.Errorf("v1 does not support implied upstreams yet") } } @@ -424,31 +457,36 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error return nil, fmt.Errorf("cluster %q node %q service %q is not valid: %w", c.Name, n.Name, svc.ID.String(), err) } + if svc.EnableTransparentProxy && !n.IsDataplane() { + return nil, fmt.Errorf("cannot enable tproxy on a non-dataplane node") + } + if n.IsV2() { if implicitV2Services { svc.V2Services = []string{svc.ID.Name} var svcPorts []*pbcatalog.ServicePort - for name := range svc.Ports { + for name, cfg := range svc.Ports { svcPorts = append(svcPorts, &pbcatalog.ServicePort{ TargetPort: name, - Protocol: pbcatalog.Protocol_PROTOCOL_TCP, // TODO - }) - } - if !svc.DisableServiceMesh { - svcPorts = append(svcPorts, &pbcatalog.ServicePort{ - TargetPort: "mesh", Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + Protocol: cfg.ActualProtocol, }) } v2svc := &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{ - Names: []string{svc.Workload}, - }, - Ports: svcPorts, + Workloads: &pbcatalog.WorkloadSelector{}, + Ports: svcPorts, } - c.Services[svc.ID] = v2svc + prev, ok := c.Services[svc.ID] + if !ok { + c.Services[svc.ID] = v2svc + prev = v2svc + } + if prev.Workloads == nil { + prev.Workloads = &pbcatalog.WorkloadSelector{} + } + prev.Workloads.Names = append(prev.Workloads.Names, svc.Workload) } else { for _, name := range svc.V2Services { @@ -466,20 +504,45 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error } } - if len(svc.WorkloadIdentities) == 0 { - svc.WorkloadIdentities = []string{svc.ID.Name} + if svc.WorkloadIdentity == "" { + svc.WorkloadIdentity = svc.ID.Name } } else { if len(svc.V2Services) > 0 { return nil, fmt.Errorf("cannot specify v2 services for v1") } - if len(svc.WorkloadIdentities) > 0 { + if svc.WorkloadIdentity != "" { return nil, fmt.Errorf("cannot specify workload identities for v1") } } } } + if err := assignVirtualIPs(c); err != nil { + return nil, err + } + + if c.EnableV2 { + // Populate the VirtualPort field on all implied upstreams. + for _, n := range c.Nodes { + for _, svc := range n.Services { + for _, u := range svc.ImpliedUpstreams { + res, ok := c.Services[u.ID] + if ok { + for _, sp := range res.Ports { + if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + continue + } + if sp.TargetPort == u.PortName { + u.VirtualPort = sp.VirtualPort + } + } + } + } + } + } + } + // Explode this into the explicit list based on stray references made. c.Partitions = nil for ap, nsMap := range tenancies { @@ -605,6 +668,21 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error // this helps in generating fortio assertions; otherwise field is ignored u.ID.Partition = remotePeer.Link.Partition } + for _, u := range svc.ImpliedUpstreams { + if u.Peer == "" { + u.Cluster = c.Name + u.Peering = nil + continue + } + remotePeer, ok := c.Peerings[u.Peer] + if !ok { + return nil, fmt.Errorf("not possible") + } + u.Cluster = remotePeer.Link.Name + u.Peering = remotePeer.Link + // this helps in generating fortio assertions; otherwise field is ignored + u.ID.Partition = remotePeer.Link.Partition + } } } } @@ -671,6 +749,51 @@ func compile(logger hclog.Logger, raw *Config, prev *Topology) (*Topology, error return t, nil } +func assignVirtualIPs(c *Cluster) error { + lastVIPIndex := 1 + for _, svcData := range c.Services { + lastVIPIndex++ + if lastVIPIndex > 250 { + return fmt.Errorf("too many ips using this approach to VIPs") + } + svcData.VirtualIps = []string{ + fmt.Sprintf("10.244.0.%d", lastVIPIndex), + } + + // populate virtual ports where we forgot them + var ( + usedPorts = make(map[uint32]struct{}) + next = uint32(8080) + ) + for _, sp := range svcData.Ports { + if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + continue + } + if sp.VirtualPort > 0 { + usedPorts[sp.VirtualPort] = struct{}{} + } + } + for _, sp := range svcData.Ports { + if sp.Protocol == pbcatalog.Protocol_PROTOCOL_MESH { + continue + } + if sp.VirtualPort > 0 { + continue + } + RETRY: + attempt := next + next++ + _, used := usedPorts[attempt] + if used { + goto RETRY + } + usedPorts[attempt] = struct{}{} + sp.VirtualPort = attempt + } + } + return nil +} + const permutedWarning = "use the disabled node kind if you want to ignore a node" func inheritAndValidateNodes( diff --git a/testing/deployer/topology/images.go b/testing/deployer/topology/images.go index 7adb8d3f7ee6..318154c5822c 100644 --- a/testing/deployer/topology/images.go +++ b/testing/deployer/topology/images.go @@ -38,13 +38,21 @@ func (i Images) LocalDataplaneImage() string { return "local/" + name + ":" + tag } +func (i Images) LocalDataplaneTProxyImage() string { + return spliceImageNamesAndTags(i.Dataplane, i.Consul, "tproxy") +} + func (i Images) EnvoyConsulImage() string { - if i.Consul == "" || i.Envoy == "" { + return spliceImageNamesAndTags(i.Consul, i.Envoy, "") +} + +func spliceImageNamesAndTags(base1, base2, nameSuffix string) string { + if base1 == "" || base2 == "" { return "" } - img1, tag1, ok1 := strings.Cut(i.Consul, ":") - img2, tag2, ok2 := strings.Cut(i.Envoy, ":") + img1, tag1, ok1 := strings.Cut(base1, ":") + img2, tag2, ok2 := strings.Cut(base2, ":") if !ok1 { tag1 = "latest" } @@ -66,8 +74,12 @@ func (i Images) EnvoyConsulImage() string { name2 = repo2 } + if nameSuffix != "" { + nameSuffix = "-" + nameSuffix + } + // ex: local/hashicorp-consul-and-envoyproxy-envoy:1.15.0-with-v1.26.2 - return "local/" + name1 + "-and-" + name2 + ":" + tag1 + "-with-" + tag2 + return "local/" + name1 + "-and-" + name2 + nameSuffix + ":" + tag1 + "-with-" + tag2 } // TODO: what is this for and why do we need to do this and why is it named this? diff --git a/testing/deployer/topology/relationships.go b/testing/deployer/topology/relationships.go index 57c075b77f0f..8448451f3f07 100644 --- a/testing/deployer/topology/relationships.go +++ b/testing/deployer/topology/relationships.go @@ -22,6 +22,12 @@ func (t *Topology) ComputeRelationships() []Relationship { Upstream: u, }) } + for _, u := range s.ImpliedUpstreams { + out = append(out, Relationship{ + Caller: s, + Upstream: u, + }) + } } } } @@ -35,6 +41,10 @@ func RenderRelationships(ships []Relationship) string { w := tabwriter.NewWriter(&buf, 0, 0, 3, ' ', tabwriter.Debug) fmt.Fprintf(w, "DOWN\tnode\tservice\tport\tUP\tservice\t\n") for _, r := range ships { + suffix := "" + if r.Upstream.Implied { + suffix = " (implied)" + } fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\t%s\t\n", r.downCluster(), @@ -42,7 +52,7 @@ func RenderRelationships(ships []Relationship) string { r.Caller.ID.String(), r.Upstream.LocalPort, r.upCluster(), - r.Upstream.ID.String(), + r.Upstream.ID.String()+suffix, ) } fmt.Fprintf(w, "\t\t\t\t\t\t\n") @@ -57,14 +67,19 @@ type Relationship struct { } func (r Relationship) String() string { + suffix := "" + if r.Upstream.PortName != "" { + suffix = " port " + r.Upstream.PortName + } return fmt.Sprintf( - "%s on %s in %s via :%d => %s in %s", + "%s on %s in %s via :%d => %s in %s%s", r.Caller.ID.String(), r.Caller.Node.ID().String(), r.downCluster(), r.Upstream.LocalPort, r.Upstream.ID.String(), r.upCluster(), + suffix, ) } diff --git a/testing/deployer/topology/topology.go b/testing/deployer/topology/topology.go index 48edf5629626..b59045b564b4 100644 --- a/testing/deployer/topology/topology.go +++ b/testing/deployer/topology/topology.go @@ -10,6 +10,7 @@ import ( "net/netip" "reflect" "sort" + "strings" "github.com/hashicorp/consul/api" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" @@ -717,6 +718,32 @@ type ServiceAndNode struct { Node *Node } +// Protocol is a convenience function to use when authoring topology configs. +func Protocol(s string) (pbcatalog.Protocol, bool) { + switch strings.ToLower(s) { + case "tcp": + return pbcatalog.Protocol_PROTOCOL_TCP, true + case "http": + return pbcatalog.Protocol_PROTOCOL_HTTP, true + case "http2": + return pbcatalog.Protocol_PROTOCOL_HTTP2, true + case "grpc": + return pbcatalog.Protocol_PROTOCOL_GRPC, true + case "mesh": + return pbcatalog.Protocol_PROTOCOL_MESH, true + default: + return pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, false + } +} + +type Port struct { + Number int + Protocol string `json:",omitempty"` + + // denormalized at topology compile + ActualProtocol pbcatalog.Protocol `json:",omitempty"` +} + // TODO(rb): really this should now be called "workload" or "instance" type Service struct { ID ServiceID @@ -728,15 +755,7 @@ type Service struct { // Ports is the v2 multi-port list for this service. // // This only applies for multi-port (v2). - Ports map[string]int `json:",omitempty"` - - // ExposedPort is the exposed docker port corresponding to 'Port'. - ExposedPort int `json:",omitempty"` - - // ExposedPorts are the exposed docker ports corresponding to 'Ports'. - // - // This only applies for multi-port (v2). - ExposedPorts map[string]int `json:",omitempty"` + Ports map[string]*Port `json:",omitempty"` // V2Services contains service names (which are merged with the tenancy // info from ID) to resolve services in the Services slice in the Cluster @@ -748,14 +767,14 @@ type Service struct { // This only applies for multi-port (v2). V2Services []string `json:",omitempty"` - // WorkloadIdentities contains named WorkloadIdentities to assign to this + // WorkloadIdentity contains named WorkloadIdentity to assign to this // workload. // // If omitted it is inferred that the ID.Name field is the singular // identity for this workload. // // This only applies for multi-port (v2). - WorkloadIdentities []string `json:",omitempty"` + WorkloadIdentity string `json:",omitempty"` Disabled bool `json:",omitempty"` // TODO @@ -774,9 +793,11 @@ type Service struct { Command []string `json:",omitempty"` // optional Env []string `json:",omitempty"` // optional - DisableServiceMesh bool `json:",omitempty"` - IsMeshGateway bool `json:",omitempty"` - Upstreams []*Upstream + EnableTransparentProxy bool `json:",omitempty"` + DisableServiceMesh bool `json:",omitempty"` + IsMeshGateway bool `json:",omitempty"` + Upstreams []*Upstream `json:",omitempty"` + ImpliedUpstreams []*Upstream `json:",omitempty"` // denormalized at topology compile Node *Node `json:"-"` @@ -784,9 +805,28 @@ type Service struct { Workload string `json:"-"` } +func (s *Service) ExposedPort(name string) int { + if s.Node == nil { + panic("ExposedPort cannot be called until after Compile") + } + + var internalPort int + if name == "" { + internalPort = s.Port + } else { + port, ok := s.Ports[name] + if !ok { + panic("port with name " + name + " not present on service") + } + internalPort = port.Number + } + + return s.Node.ExposedPort(internalPort) +} + func (s *Service) PortOrDefault(name string) int { if len(s.Ports) > 0 { - return s.Ports[name] + return s.Ports[name].Number } return s.Port } @@ -800,8 +840,6 @@ func (s *Service) IsV1() bool { } func (s *Service) inheritFromExisting(existing *Service) { - s.ExposedPort = existing.ExposedPort - s.ExposedPorts = existing.ExposedPorts s.ExposedEnvoyAdminPort = existing.ExposedEnvoyAdminPort } @@ -810,10 +848,10 @@ func (s *Service) ports() []int { if len(s.Ports) > 0 { seen := make(map[int]struct{}) for _, port := range s.Ports { - if _, ok := seen[port]; !ok { + if _, ok := seen[port.Number]; !ok { // It's totally fine to expose the same port twice in a workload. - seen[port] = struct{}{} - out = append(out, port) + seen[port.Number] = struct{}{} + out = append(out, port.Number) } } } else if s.Port > 0 { @@ -838,7 +876,6 @@ func (s *Service) HasCheck() bool { } func (s *Service) DigestExposedPorts(ports map[int]int) { - s.ExposedPort = ports[s.Port] if s.EnvoyAdminPort > 0 { s.ExposedEnvoyAdminPort = ports[s.EnvoyAdminPort] } else { @@ -858,14 +895,38 @@ func (s *Service) Validate() error { return fmt.Errorf("cannot specify both singleport and multiport on service in v2") } if s.Port > 0 { - s.Ports = map[string]int{"legacy": s.Port} + s.Ports = map[string]*Port{ + "legacy": { + Number: s.Port, + Protocol: "tcp", + }, + } s.Port = 0 } + if !s.DisableServiceMesh && s.EnvoyPublicListenerPort > 0 { + s.Ports["mesh"] = &Port{ + Number: s.EnvoyPublicListenerPort, + Protocol: "mesh", + } + } + for name, port := range s.Ports { - if port <= 0 { - return fmt.Errorf("service has invalid port %q", name) + if port == nil { + return fmt.Errorf("cannot be nil") + } + if port.Number <= 0 { + return fmt.Errorf("service has invalid port number %q", name) } + if port.ActualProtocol != pbcatalog.Protocol_PROTOCOL_UNSPECIFIED { + return fmt.Errorf("user cannot specify ActualProtocol field") + } + + proto, valid := Protocol(port.Protocol) + if !valid { + return fmt.Errorf("service has invalid port protocol %q", port.Protocol) + } + port.ActualProtocol = proto } } else { if len(s.Ports) > 0 { @@ -874,6 +935,9 @@ func (s *Service) Validate() error { if s.Port <= 0 { return fmt.Errorf("service has invalid port") } + if s.EnableTransparentProxy { + return fmt.Errorf("tproxy does not work with v1 yet") + } } if s.DisableServiceMesh && s.IsMeshGateway { return fmt.Errorf("cannot disable service mesh and still run a mesh gateway") @@ -881,6 +945,12 @@ func (s *Service) Validate() error { if s.DisableServiceMesh && len(s.Upstreams) > 0 { return fmt.Errorf("cannot disable service mesh and configure upstreams") } + if s.DisableServiceMesh && len(s.ImpliedUpstreams) > 0 { + return fmt.Errorf("cannot disable service mesh and configure implied upstreams") + } + if s.DisableServiceMesh && s.EnableTransparentProxy { + return fmt.Errorf("cannot disable service mesh and activate tproxy") + } if s.DisableServiceMesh { if s.EnvoyAdminPort != 0 { @@ -906,6 +976,20 @@ func (s *Service) Validate() error { return fmt.Errorf("upstream local address is invalid: %s", u.LocalAddress) } } + if u.Implied { + return fmt.Errorf("implied field cannot be set") + } + } + for _, u := range s.ImpliedUpstreams { + if u.ID.Name == "" { + return fmt.Errorf("implied upstream service name is required") + } + if u.LocalPort > 0 { + return fmt.Errorf("implied upstream local port cannot be set") + } + if u.LocalAddress != "" { + return fmt.Errorf("implied upstream local address cannot be set") + } } return nil @@ -924,8 +1008,10 @@ type Upstream struct { // TODO: what about mesh gateway mode overrides? // computed at topology compile - Cluster string `json:",omitempty"` - Peering *PeerCluster `json:",omitempty"` // this will have Link!=nil + Cluster string `json:",omitempty"` + Peering *PeerCluster `json:",omitempty"` // this will have Link!=nil + Implied bool `json:",omitempty"` + VirtualPort uint32 `json:",omitempty"` } type Peering struct {