Skip to content

Commit

Permalink
feat(MeshHTTPRoute): initial plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <mjboamail@gmail.com>
  • Loading branch information
michaelbeaumont committed Dec 23, 2022
1 parent 7be3609 commit 6c0b842
Show file tree
Hide file tree
Showing 9 changed files with 619 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (

// MeshHTTPRoute
// +kuma:policy:skip_registration=true
//
// This policy defines its own `GetDefault` method so that it can have the given
// structure for deserialization but still use the generic policy merging
// machinery.
//
// +kuma:policy:skip_get_default=true
type MeshHTTPRoute struct {
// TargetRef is a reference to the resource the policy takes an effect on.
Expand Down
12 changes: 12 additions & 0 deletions pkg/plugins/policies/meshhttproute/common/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

import (
api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
)

type Route struct {
Matches []api.Match
Filters []api.Filter
BackendRefs []envoy_common.Cluster
}
99 changes: 99 additions & 0 deletions pkg/plugins/policies/meshhttproute/plugin/v1alpha1/clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package v1alpha1

import (
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
envoy_clusters "github.com/kumahq/kuma/pkg/xds/envoy/clusters"
envoy_tags "github.com/kumahq/kuma/pkg/xds/envoy/tags"
"github.com/kumahq/kuma/pkg/xds/generator"
)

func generateClusters(
proxy *core_xds.Proxy,
meshCtx xds_context.MeshContext,
services envoy_common.Services,
) (*core_xds.ResourceSet, error) {
resources := core_xds.NewResourceSet()

for _, serviceName := range services.Sorted() {
service := services[serviceName]
protocol := generator.InferProtocol(proxy, service.Clusters())
tlsReady := service.TLSReady()

for _, cluster := range service.Clusters() {
edsClusterBuilder := envoy_clusters.NewClusterBuilder(proxy.APIVersion)

clusterName := cluster.Name()
clusterTags := []envoy_tags.Tags{cluster.Tags()}

if service.HasExternalService() {
if meshCtx.Resource.ZoneEgressEnabled() {
edsClusterBuilder.
Configure(envoy_clusters.EdsCluster(clusterName)).
Configure(envoy_clusters.ClientSideMTLS(
proxy.SecretsTracker,
meshCtx.Resource,
mesh_proto.ZoneEgressServiceName,
tlsReady,
clusterTags,
))
} else {
endpoints := proxy.Routing.ExternalServiceOutboundTargets[serviceName]
isIPv6 := proxy.Dataplane.IsIPv6()

edsClusterBuilder.
Configure(envoy_clusters.ProvidedEndpointCluster(clusterName, isIPv6, endpoints...)).
Configure(envoy_clusters.ClientSideTLS(endpoints))
}

switch protocol {
case core_mesh.ProtocolHTTP:
edsClusterBuilder.Configure(envoy_clusters.Http())
case core_mesh.ProtocolHTTP2, core_mesh.ProtocolGRPC:
edsClusterBuilder.Configure(envoy_clusters.Http2())
default:
}
} else {
edsClusterBuilder.
Configure(envoy_clusters.EdsCluster(clusterName)).
Configure(envoy_clusters.LB(cluster.LB())).
Configure(envoy_clusters.Http2())

if upstreamMeshName := cluster.Mesh(); upstreamMeshName != "" {
for _, otherMesh := range append(meshCtx.Resources.OtherMeshes().Items, meshCtx.Resource) {
if otherMesh.GetMeta().GetName() == upstreamMeshName {
edsClusterBuilder.Configure(
envoy_clusters.CrossMeshClientSideMTLS(
proxy.SecretsTracker, meshCtx.Resource, otherMesh, serviceName, tlsReady, clusterTags,
),
)
break
}
}
} else {
edsClusterBuilder.Configure(envoy_clusters.ClientSideMTLS(
proxy.SecretsTracker,
meshCtx.Resource, serviceName, tlsReady, clusterTags))
}
}

edsCluster, err := edsClusterBuilder.Build()
if err != nil {
return nil, errors.Wrapf(err, "build CDS for cluster %s failed", clusterName)
}

resources = resources.Add(&core_xds.Resource{
Name: clusterName,
Origin: generator.OriginOutbound,
Resource: edsCluster,
})
}
}

return resources, nil
}
48 changes: 48 additions & 0 deletions pkg/plugins/policies/meshhttproute/plugin/v1alpha1/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package v1alpha1

import (
"context"

"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/core/user"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
)

func generateEndpoints(
proxy *core_xds.Proxy,
ctx xds_context.Context,
services envoy_common.Services,
) (*core_xds.ResourceSet, error) {
resources := core_xds.NewResourceSet()

for _, serviceName := range services.Sorted() {
// When no zone egress is present in a mesh Endpoints for ExternalServices
// are specified in load assignment in DNS Cluster.
// We are not allowed to add endpoints with DNS names through EDS.
if !services[serviceName].HasExternalService() || ctx.Mesh.Resource.ZoneEgressEnabled() {
for _, cluster := range services[serviceName].Clusters() {
var endpoints core_xds.EndpointMap
if cluster.Mesh() != "" {
endpoints = ctx.Mesh.CrossMeshEndpoints[cluster.Mesh()]
} else {
endpoints = ctx.Mesh.EndpointMap
}

loadAssignment, err := ctx.ControlPlane.CLACache.GetCLA(user.Ctx(context.TODO(), user.ControlPlane), proxy.Dataplane.GetMeta().GetMesh(), ctx.Mesh.Hash, cluster, proxy.APIVersion, endpoints)
if err != nil {
return nil, errors.Wrapf(err, "could not get ClusterLoadAssignment for %s", serviceName)
}

resources.Add(&core_xds.Resource{
Name: cluster.Name(),
Resource: loadAssignment,
})
}
}
}

return resources, nil
}
156 changes: 156 additions & 0 deletions pkg/plugins/policies/meshhttproute/plugin/v1alpha1/listeners.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package v1alpha1

import (
"fmt"

common_api "github.com/kumahq/kuma/api/common/v1alpha1"
mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
xds "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/xds"
"github.com/kumahq/kuma/pkg/xds/envoy"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
envoy_listeners "github.com/kumahq/kuma/pkg/xds/envoy/listeners"
envoy_names "github.com/kumahq/kuma/pkg/xds/envoy/names"
envoy_tags "github.com/kumahq/kuma/pkg/xds/envoy/tags"
"github.com/kumahq/kuma/pkg/xds/generator"
)

func generateListeners(
proxy *core_xds.Proxy,
rules []ToRouteRule,
servicesAcc envoy_common.ServicesAccumulator,
) (*core_xds.ResourceSet, error) {
resources := core_xds.NewResourceSet()
splitCounter := &splitCounter{}

for _, outbound := range proxy.Dataplane.Spec.GetNetworking().GetOutbound() {
serviceName := outbound.GetTagsIncludingLegacy()[mesh_proto.ServiceTag]
oface := proxy.Dataplane.Spec.Networking.ToOutboundInterface(outbound)
outboundListenerName := envoy_names.GetOutboundListenerName(oface.DataplaneIP, oface.DataplanePort)

listenerBuilder := envoy_listeners.NewListenerBuilder(proxy.APIVersion).
Configure(envoy_listeners.OutboundListener(outboundListenerName, oface.DataplaneIP, oface.DataplanePort, core_xds.SocketAddressProtocolTCP)).
Configure(envoy_listeners.TransparentProxying(proxy.Dataplane.Spec.Networking.GetTransparentProxying())).
Configure(envoy_listeners.TagsMetadata(envoy_tags.Tags(outbound.GetTagsIncludingLegacy()).WithoutTags(mesh_proto.MeshTag)))

for _, route := range FindRoutes(rules, serviceName) {
clusters := makeClusters(proxy, map[string]string{}, splitCounter, route.BackendRefs)
servicesAcc.Add(clusters...)

filterChainBuilder := envoy_listeners.NewFilterChainBuilder(proxy.APIVersion)

serviceName := outbound.GetTagsIncludingLegacy()[mesh_proto.ServiceTag]
configurer := &xds.HttpOutboundRouteConfigurer{
Service: serviceName,
Matches: route.Matches,
Filters: route.Filters,
Clusters: clusters,
DpTags: proxy.Dataplane.Spec.TagSet(),
}
filterChainBuilder.
Configure(envoy_listeners.HttpConnectionManager(serviceName, false)).
Configure(envoy_listeners.AddFilterChainConfigurer(configurer))

listenerBuilder = listenerBuilder.Configure(envoy_listeners.FilterChain(filterChainBuilder))
}

listener, err := listenerBuilder.Build()
if err != nil {
return nil, err
}
resources.Add(&core_xds.Resource{
Name: listener.GetName(),
Origin: generator.OriginOutbound,
Resource: listener,
})
}

return resources, nil
}

// Whenever `split` is specified in the TrafficRoute which has more than kuma.io/service tag
// We generate a separate Envoy cluster with _X_ suffix. SplitCounter ensures that we have different X for every split in one Dataplane
// Each split is distinct for the whole Dataplane so we can avoid accidental cluster overrides.
type splitCounter struct {
counter int
}

func (s *splitCounter) getAndIncrement() int {
counter := s.counter
s.counter++
return counter
}

func makeClusters(
proxy *core_xds.Proxy,
clusterCache map[string]string,
splitCounter *splitCounter,
refs []api.BackendRef,
) []envoy.Cluster {
var clustersInternal, clustersExternal []envoy.Cluster

for _, ref := range refs {
switch ref.Kind {
case common_api.MeshService, common_api.MeshServiceSubset:
default:
continue
}

service := ref.Name
if ref.Weight == 0 {
continue
}

name := service

if len(ref.Tags) > 0 {
name = envoy_names.GetSplitClusterName(service, splitCounter.getAndIncrement())
}

if mesh, ok := ref.Tags[mesh_proto.MeshTag]; ok {
// The name should be distinct to the service & mesh combination
name = fmt.Sprintf("%s_%s", name, mesh)
}

// We assume that all the targets are either ExternalServices or not
// therefore we check only the first one
var isExternalService bool
if endpoints := proxy.Routing.OutboundTargets[service]; len(endpoints) > 0 {
isExternalService = endpoints[0].IsExternalService()
}
if endpoints := proxy.Routing.ExternalServiceOutboundTargets[service]; len(endpoints) > 0 {
isExternalService = true
}

allTags := envoy_tags.Tags(ref.Tags).WithTags(mesh_proto.ServiceTag, ref.Name)
cluster := envoy_common.NewCluster(
envoy_common.WithService(service),
envoy_common.WithName(name),
envoy_common.WithWeight(uint32(ref.Weight)),
// The mesh tag is set here if this destination is generated
// from a MeshGateway virtual outbound and is not part of the
// service tags
envoy_common.WithTags(allTags.WithoutTags(mesh_proto.MeshTag)),
envoy_common.WithExternalService(isExternalService),
)

if mesh, ok := ref.Tags[mesh_proto.MeshTag]; ok {
cluster.SetMesh(mesh)
}

if name, ok := clusterCache[allTags.String()]; ok {
cluster.SetName(name)
} else {
clusterCache[allTags.String()] = cluster.Name()
}

if isExternalService {
clustersExternal = append(clustersExternal, cluster)
} else {
clustersInternal = append(clustersInternal, cluster)
}
}

return append(clustersExternal, clustersInternal...)
}
Loading

0 comments on commit 6c0b842

Please sign in to comment.