Skip to content

Commit

Permalink
[API Gateway] WAN Federation test and fixes (#2295)
Browse files Browse the repository at this point in the history
* [API Gateway] WAN Federation test and fixes

* Fix unit tests
  • Loading branch information
Andrew Stucki authored and absolutelightning committed Aug 4, 2023
1 parent e19df27 commit f20fb7a
Show file tree
Hide file tree
Showing 13 changed files with 427 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resources:
- serviceresolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

apiVersion: consul.hashicorp.com/v1alpha1
kind: ServiceResolver
metadata:
name: static-server
spec:
redirect:
service: static-server
datacenter: dc2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resources:
- serviceresolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

apiVersion: consul.hashicorp.com/v1alpha1
kind: ServiceResolver
metadata:
name: static-server
spec:
redirect:
service: static-server
datacenter: dc1
241 changes: 241 additions & 0 deletions acceptance/tests/wan-federation/wan_federation_gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package wanfederation

import (
"context"
"fmt"
"testing"
"time"

"github.com/hashicorp/consul-k8s/acceptance/framework/consul"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul-k8s/acceptance/framework/helpers"
"github.com/hashicorp/consul-k8s/acceptance/framework/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/logger"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/serf/testutil/retry"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

func TestWANFederation_Gateway(t *testing.T) {
env := suite.Environment()
cfg := suite.Config()

if cfg.UseKind {
// the only way this test can currently run on kind, at least on a Mac, is via leveraging MetalLB, which
// isn't in CI, so we just skip for now.
t.Skipf("skipping wan federation tests as they currently fail on Kind even though they work on other clouds.")
}

primaryContext := env.DefaultContext(t)
secondaryContext := env.Context(t, environment.SecondaryContextName)

primaryHelmValues := map[string]string{
"global.datacenter": "dc1",

"global.tls.enabled": "true",
"global.tls.httpsOnly": "true",

"global.federation.enabled": "true",
"global.federation.createFederationSecret": "true",

"global.acls.manageSystemACLs": "true",
"global.acls.createReplicationToken": "true",

"connectInject.enabled": "true",
"connectInject.replicas": "1",

"meshGateway.enabled": "true",
"meshGateway.replicas": "1",
}

releaseName := helpers.RandomName()

// Install the primary consul cluster in the default kubernetes context
primaryConsulCluster := consul.NewHelmCluster(t, primaryHelmValues, primaryContext, cfg, releaseName)
primaryConsulCluster.Create(t)

// Get the federation secret from the primary cluster and apply it to secondary cluster
federationSecretName := fmt.Sprintf("%s-consul-federation", releaseName)
logger.Logf(t, "retrieving federation secret %s from the primary cluster and applying to the secondary", federationSecretName)
federationSecret, err := primaryContext.KubernetesClient(t).CoreV1().Secrets(primaryContext.KubectlOptions(t).Namespace).Get(context.Background(), federationSecretName, metav1.GetOptions{})
require.NoError(t, err)
federationSecret.ResourceVersion = ""
_, err = secondaryContext.KubernetesClient(t).CoreV1().Secrets(secondaryContext.KubectlOptions(t).Namespace).Create(context.Background(), federationSecret, metav1.CreateOptions{})
require.NoError(t, err)

var k8sAuthMethodHost string
// When running on kind, the kube API address in kubeconfig will have a localhost address
// which will not work from inside the container. That's why we need to use the endpoints address instead
// which will point the node IP.
if cfg.UseKind {
// The Kubernetes AuthMethod host is read from the endpoints for the Kubernetes service.
kubernetesEndpoint, err := secondaryContext.KubernetesClient(t).CoreV1().Endpoints("default").Get(context.Background(), "kubernetes", metav1.GetOptions{})
require.NoError(t, err)
k8sAuthMethodHost = fmt.Sprintf("%s:%d", kubernetesEndpoint.Subsets[0].Addresses[0].IP, kubernetesEndpoint.Subsets[0].Ports[0].Port)
} else {
k8sAuthMethodHost = k8s.KubernetesAPIServerHostFromOptions(t, secondaryContext.KubectlOptions(t))
}

// Create secondary cluster
secondaryHelmValues := map[string]string{
"global.datacenter": "dc2",

"global.tls.enabled": "true",
"global.tls.httpsOnly": "false",
"global.acls.manageSystemACLs": "true",
"global.tls.caCert.secretName": federationSecretName,
"global.tls.caCert.secretKey": "caCert",
"global.tls.caKey.secretName": federationSecretName,
"global.tls.caKey.secretKey": "caKey",

"global.federation.enabled": "true",

"server.extraVolumes[0].type": "secret",
"server.extraVolumes[0].name": federationSecretName,
"server.extraVolumes[0].load": "true",
"server.extraVolumes[0].items[0].key": "serverConfigJSON",
"server.extraVolumes[0].items[0].path": "config.json",

"connectInject.enabled": "true",
"connectInject.replicas": "1",

"meshGateway.enabled": "true",
"meshGateway.replicas": "1",

"global.acls.replicationToken.secretName": federationSecretName,
"global.acls.replicationToken.secretKey": "replicationToken",
"global.federation.k8sAuthMethodHost": k8sAuthMethodHost,
"global.federation.primaryDatacenter": "dc1",
}

// Install the secondary consul cluster in the secondary kubernetes context
secondaryConsulCluster := consul.NewHelmCluster(t, secondaryHelmValues, secondaryContext, cfg, releaseName)
secondaryConsulCluster.Create(t)

primaryClient, _ := primaryConsulCluster.SetupConsulClient(t, true)
secondaryClient, _ := secondaryConsulCluster.SetupConsulClient(t, true)

// Verify federation between servers
logger.Log(t, "verifying federation was successful")
helpers.VerifyFederation(t, primaryClient, secondaryClient, releaseName, true)

// Create a ProxyDefaults resource to configure services to use the mesh
// gateways.
logger.Log(t, "creating proxy-defaults config in dc1")
kustomizeDir := "../fixtures/cases/api-gateways/mesh"
k8s.KubectlApplyK(t, primaryContext.KubectlOptions(t), kustomizeDir)
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
k8s.KubectlDeleteK(t, primaryContext.KubectlOptions(t), kustomizeDir)
})

// these clients are just there so we can exec in and curl on them.
logger.Log(t, "creating static-client in dc1")
k8s.DeployKustomize(t, primaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-client-multi-dc")

logger.Log(t, "creating static-client in dc2")
k8s.DeployKustomize(t, secondaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-client-multi-dc")

t.Run("from primary to secondary", func(t *testing.T) {
logger.Log(t, "creating static-server in dc2")
k8s.DeployKustomize(t, secondaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-server-inject")

logger.Log(t, "creating api-gateway resources in dc1")
out, err := k8s.RunKubectlAndGetOutputE(t, primaryContext.KubectlOptions(t), "apply", "-k", "../fixtures/bases/api-gateway")
require.NoError(t, err, out)
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
// Ignore errors here because if the test ran as expected
// the custom resources will have been deleted.
k8s.RunKubectlAndGetOutputE(t, primaryContext.KubectlOptions(t), "delete", "-k", "../fixtures/bases/api-gateway")
})

// create a service resolver for doing cross-dc redirects.
k8s.KubectlApplyK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc1-to-dc2-resolver")
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
k8s.KubectlDeleteK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc1-to-dc2-resolver")
})

// patching the route to target a MeshService since we don't have the corresponding Kubernetes service in this
// cluster.
k8s.RunKubectl(t, primaryContext.KubectlOptions(t), "patch", "httproute", "http-route", "-p", `{"spec":{"rules":[{"backendRefs":[{"group":"consul.hashicorp.com","kind":"MeshService","name":"mesh-service","port":80}]}]}}`, "--type=merge")

checkConnectivity(t, primaryContext, primaryClient)
})

t.Run("from secondary to primary", func(t *testing.T) {
// Check that we can connect services over the mesh gateways
logger.Log(t, "creating static-server in dc1")
k8s.DeployKustomize(t, primaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-server-inject")

logger.Log(t, "creating api-gateway resources in dc2")
out, err := k8s.RunKubectlAndGetOutputE(t, secondaryContext.KubectlOptions(t), "apply", "-k", "../fixtures/bases/api-gateway")
require.NoError(t, err, out)
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
// Ignore errors here because if the test ran as expected
// the custom resources will have been deleted.
k8s.RunKubectlAndGetOutputE(t, secondaryContext.KubectlOptions(t), "delete", "-k", "../fixtures/bases/api-gateway")
})

// create a service resolver for doing cross-dc redirects.
k8s.KubectlApplyK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc2-to-dc1-resolver")
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
k8s.KubectlDeleteK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc2-to-dc1-resolver")
})

// patching the route to target a MeshService since we don't have the corresponding Kubernetes service in this
// cluster.
k8s.RunKubectl(t, secondaryContext.KubectlOptions(t), "patch", "httproute", "http-route", "-p", `{"spec":{"rules":[{"backendRefs":[{"group":"consul.hashicorp.com","kind":"MeshService","name":"mesh-service","port":80}]}]}}`, "--type=merge")

checkConnectivity(t, secondaryContext, primaryClient)
})
}

func checkConnectivity(t *testing.T, ctx environment.TestContext, client *api.Client) {
k8sClient := ctx.ControllerRuntimeClient(t)

// On startup, the controller can take upwards of 1m to perform
// leader election so we may need to wait a long time for
// the reconcile loop to run (hence the 1m timeout here).
var gatewayAddress string
counter := &retry.Counter{Count: 600, Wait: 2 * time.Second}
retry.RunWith(counter, t, func(r *retry.R) {
var gateway gwv1beta1.Gateway
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: "gateway", Namespace: "default"}, &gateway)
require.NoError(r, err)

// check that we have an address to use
require.Len(r, gateway.Status.Addresses, 1)
// now we know we have an address, set it so we can use it
gatewayAddress = gateway.Status.Addresses[0].Value
})

targetAddress := fmt.Sprintf("http://%s/", gatewayAddress)

logger.Log(t, "checking that the connection is not successful because there's no intention")
k8s.CheckStaticServerHTTPConnectionFailing(t, ctx.KubectlOptions(t), StaticClientName, targetAddress)

logger.Log(t, "creating intention")
_, _, err := client.ConfigEntries().Set(&api.ServiceIntentionsConfigEntry{
Kind: api.ServiceIntentions,
Name: "static-server",
Sources: []*api.SourceIntention{
{
Name: "gateway",
Action: api.IntentionActionAllow,
},
},
}, nil)
require.NoError(t, err)
defer func() {
_, err := client.ConfigEntries().Delete(api.ServiceIntentions, "static-server", &api.WriteOptions{})
require.NoError(t, err)
}()

logger.Log(t, "checking that connection is successful")
k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), StaticClientName, targetAddress)
}
19 changes: 19 additions & 0 deletions control-plane/api-gateway/cache/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/hashicorp/consul-k8s/control-plane/api-gateway/common"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/namespaces"
"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -60,6 +61,7 @@ type Config struct {
ConsulClientConfig *consul.Config
ConsulServerConnMgr consul.ServerConnectionManager
NamespacesEnabled bool
Datacenter string
CrossNamespaceACLPolicy string
Logger logr.Logger
}
Expand All @@ -83,6 +85,8 @@ type Cache struct {
synced chan struct{}

kinds []string

datacenter string
}

func New(config Config) *Cache {
Expand All @@ -104,6 +108,7 @@ func New(config Config) *Cache {
synced: make(chan struct{}, len(Kinds)),
logger: config.Logger,
crossNamespaceACLPolicy: config.CrossNamespaceACLPolicy,
datacenter: config.Datacenter,
}
}

Expand Down Expand Up @@ -216,6 +221,19 @@ func (c *Cache) updateAndNotify(ctx context.Context, once *sync.Once, kind strin
cache := common.NewReferenceMap()

for _, entry := range entries {
meta := entry.GetMeta()
if meta[constants.MetaKeyKubeName] == "" || meta[constants.MetaKeyDatacenter] != c.datacenter {
// Don't process things that don't belong to us. The main reason
// for this is so that we don't garbage collect config entries that
// are either user-created or that another controller running in a
// federated datacenter creates. While we still allow for competing controllers
// syncing/overriding each other due to conflicting Kubernetes objects in
// two federated clusters (which is what the rest of the controllers also allow
// for), we don't want to delete a config entry just because we don't have
// its corresponding Kubernetes object if we know it belongs to another datacenter.
continue
}

cache.Set(common.EntryToReference(entry), entry)
}

Expand Down Expand Up @@ -336,6 +354,7 @@ func (c *Cache) ensureRole(client *api.Client) (string, error) {
}

aclRoleName := "managed-gateway-acl-role"

aclRole, _, err := client.ACL().RoleReadByName(aclRoleName, &api.QueryOptions{})
if err != nil {
return "", err
Expand Down
Loading

0 comments on commit f20fb7a

Please sign in to comment.