Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API Gateway] WAN Federation test and fixes #2295

Merged
merged 2 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resources:
- serviceresolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

apiVersion: consul.hashicorp.com/v1alpha1
kind: ServiceResolver
metadata:
name: static-server
spec:
redirect:
service: static-server
datacenter: dc2
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resources:
- serviceresolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

apiVersion: consul.hashicorp.com/v1alpha1
kind: ServiceResolver
metadata:
name: static-server
spec:
redirect:
service: static-server
datacenter: dc1
241 changes: 241 additions & 0 deletions acceptance/tests/wan-federation/wan_federation_gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package wanfederation

import (
"context"
"fmt"
"testing"
"time"

"github.com/hashicorp/consul-k8s/acceptance/framework/consul"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul-k8s/acceptance/framework/helpers"
"github.com/hashicorp/consul-k8s/acceptance/framework/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/logger"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/serf/testutil/retry"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

func TestWANFederation_Gateway(t *testing.T) {
env := suite.Environment()
cfg := suite.Config()

if cfg.UseKind {
// the only way this test can currently run on kind, at least on a Mac, is via leveraging MetalLB, which
// isn't in CI, so we just skip for now.
t.Skipf("skipping wan federation tests as they currently fail on Kind even though they work on other clouds.")
}

primaryContext := env.DefaultContext(t)
secondaryContext := env.Context(t, environment.SecondaryContextName)

primaryHelmValues := map[string]string{
"global.datacenter": "dc1",

"global.tls.enabled": "true",
"global.tls.httpsOnly": "true",

"global.federation.enabled": "true",
"global.federation.createFederationSecret": "true",

"global.acls.manageSystemACLs": "true",
"global.acls.createReplicationToken": "true",

"connectInject.enabled": "true",
"connectInject.replicas": "1",

"meshGateway.enabled": "true",
"meshGateway.replicas": "1",
}

releaseName := helpers.RandomName()

// Install the primary consul cluster in the default kubernetes context
primaryConsulCluster := consul.NewHelmCluster(t, primaryHelmValues, primaryContext, cfg, releaseName)
primaryConsulCluster.Create(t)

// Get the federation secret from the primary cluster and apply it to secondary cluster
federationSecretName := fmt.Sprintf("%s-consul-federation", releaseName)
logger.Logf(t, "retrieving federation secret %s from the primary cluster and applying to the secondary", federationSecretName)
federationSecret, err := primaryContext.KubernetesClient(t).CoreV1().Secrets(primaryContext.KubectlOptions(t).Namespace).Get(context.Background(), federationSecretName, metav1.GetOptions{})
require.NoError(t, err)
federationSecret.ResourceVersion = ""
_, err = secondaryContext.KubernetesClient(t).CoreV1().Secrets(secondaryContext.KubectlOptions(t).Namespace).Create(context.Background(), federationSecret, metav1.CreateOptions{})
require.NoError(t, err)

var k8sAuthMethodHost string
// When running on kind, the kube API address in kubeconfig will have a localhost address
// which will not work from inside the container. That's why we need to use the endpoints address instead
// which will point the node IP.
if cfg.UseKind {
// The Kubernetes AuthMethod host is read from the endpoints for the Kubernetes service.
kubernetesEndpoint, err := secondaryContext.KubernetesClient(t).CoreV1().Endpoints("default").Get(context.Background(), "kubernetes", metav1.GetOptions{})
require.NoError(t, err)
k8sAuthMethodHost = fmt.Sprintf("%s:%d", kubernetesEndpoint.Subsets[0].Addresses[0].IP, kubernetesEndpoint.Subsets[0].Ports[0].Port)
} else {
k8sAuthMethodHost = k8s.KubernetesAPIServerHostFromOptions(t, secondaryContext.KubectlOptions(t))
}

// Create secondary cluster
secondaryHelmValues := map[string]string{
"global.datacenter": "dc2",

"global.tls.enabled": "true",
"global.tls.httpsOnly": "false",
"global.acls.manageSystemACLs": "true",
"global.tls.caCert.secretName": federationSecretName,
"global.tls.caCert.secretKey": "caCert",
"global.tls.caKey.secretName": federationSecretName,
"global.tls.caKey.secretKey": "caKey",

"global.federation.enabled": "true",

"server.extraVolumes[0].type": "secret",
"server.extraVolumes[0].name": federationSecretName,
"server.extraVolumes[0].load": "true",
"server.extraVolumes[0].items[0].key": "serverConfigJSON",
"server.extraVolumes[0].items[0].path": "config.json",

"connectInject.enabled": "true",
"connectInject.replicas": "1",

"meshGateway.enabled": "true",
"meshGateway.replicas": "1",

"global.acls.replicationToken.secretName": federationSecretName,
"global.acls.replicationToken.secretKey": "replicationToken",
"global.federation.k8sAuthMethodHost": k8sAuthMethodHost,
"global.federation.primaryDatacenter": "dc1",
}

// Install the secondary consul cluster in the secondary kubernetes context
secondaryConsulCluster := consul.NewHelmCluster(t, secondaryHelmValues, secondaryContext, cfg, releaseName)
secondaryConsulCluster.Create(t)

primaryClient, _ := primaryConsulCluster.SetupConsulClient(t, true)
secondaryClient, _ := secondaryConsulCluster.SetupConsulClient(t, true)

// Verify federation between servers
logger.Log(t, "verifying federation was successful")
helpers.VerifyFederation(t, primaryClient, secondaryClient, releaseName, true)

// Create a ProxyDefaults resource to configure services to use the mesh
// gateways.
logger.Log(t, "creating proxy-defaults config in dc1")
kustomizeDir := "../fixtures/cases/api-gateways/mesh"
k8s.KubectlApplyK(t, primaryContext.KubectlOptions(t), kustomizeDir)
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
k8s.KubectlDeleteK(t, primaryContext.KubectlOptions(t), kustomizeDir)
})

// these clients are just there so we can exec in and curl on them.
logger.Log(t, "creating static-client in dc1")
k8s.DeployKustomize(t, primaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-client-multi-dc")

logger.Log(t, "creating static-client in dc2")
k8s.DeployKustomize(t, secondaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-client-multi-dc")

t.Run("from primary to secondary", func(t *testing.T) {
logger.Log(t, "creating static-server in dc2")
k8s.DeployKustomize(t, secondaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-server-inject")

logger.Log(t, "creating api-gateway resources in dc1")
out, err := k8s.RunKubectlAndGetOutputE(t, primaryContext.KubectlOptions(t), "apply", "-k", "../fixtures/bases/api-gateway")
require.NoError(t, err, out)
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
// Ignore errors here because if the test ran as expected
// the custom resources will have been deleted.
k8s.RunKubectlAndGetOutputE(t, primaryContext.KubectlOptions(t), "delete", "-k", "../fixtures/bases/api-gateway")
})

// create a service resolver for doing cross-dc redirects.
k8s.KubectlApplyK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc1-to-dc2-resolver")
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
k8s.KubectlDeleteK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc1-to-dc2-resolver")
})

// patching the route to target a MeshService since we don't have the corresponding Kubernetes service in this
// cluster.
k8s.RunKubectl(t, primaryContext.KubectlOptions(t), "patch", "httproute", "http-route", "-p", `{"spec":{"rules":[{"backendRefs":[{"group":"consul.hashicorp.com","kind":"MeshService","name":"mesh-service","port":80}]}]}}`, "--type=merge")

checkConnectivity(t, primaryContext, primaryClient)
})

t.Run("from secondary to primary", func(t *testing.T) {
// Check that we can connect services over the mesh gateways
logger.Log(t, "creating static-server in dc1")
k8s.DeployKustomize(t, primaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-server-inject")

logger.Log(t, "creating api-gateway resources in dc2")
out, err := k8s.RunKubectlAndGetOutputE(t, secondaryContext.KubectlOptions(t), "apply", "-k", "../fixtures/bases/api-gateway")
require.NoError(t, err, out)
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
// Ignore errors here because if the test ran as expected
// the custom resources will have been deleted.
k8s.RunKubectlAndGetOutputE(t, secondaryContext.KubectlOptions(t), "delete", "-k", "../fixtures/bases/api-gateway")
})

// create a service resolver for doing cross-dc redirects.
k8s.KubectlApplyK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc2-to-dc1-resolver")
helpers.Cleanup(t, cfg.NoCleanupOnFailure, func() {
k8s.KubectlDeleteK(t, secondaryContext.KubectlOptions(t), "../fixtures/cases/api-gateways/dc2-to-dc1-resolver")
})

// patching the route to target a MeshService since we don't have the corresponding Kubernetes service in this
// cluster.
k8s.RunKubectl(t, secondaryContext.KubectlOptions(t), "patch", "httproute", "http-route", "-p", `{"spec":{"rules":[{"backendRefs":[{"group":"consul.hashicorp.com","kind":"MeshService","name":"mesh-service","port":80}]}]}}`, "--type=merge")

checkConnectivity(t, secondaryContext, primaryClient)
})
}

func checkConnectivity(t *testing.T, ctx environment.TestContext, client *api.Client) {
k8sClient := ctx.ControllerRuntimeClient(t)

// On startup, the controller can take upwards of 1m to perform
// leader election so we may need to wait a long time for
// the reconcile loop to run (hence the 1m timeout here).
var gatewayAddress string
counter := &retry.Counter{Count: 600, Wait: 2 * time.Second}
retry.RunWith(counter, t, func(r *retry.R) {
var gateway gwv1beta1.Gateway
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: "gateway", Namespace: "default"}, &gateway)
require.NoError(r, err)

// check that we have an address to use
require.Len(r, gateway.Status.Addresses, 1)
// now we know we have an address, set it so we can use it
gatewayAddress = gateway.Status.Addresses[0].Value
})

targetAddress := fmt.Sprintf("http://%s/", gatewayAddress)

logger.Log(t, "checking that the connection is not successful because there's no intention")
k8s.CheckStaticServerHTTPConnectionFailing(t, ctx.KubectlOptions(t), StaticClientName, targetAddress)

logger.Log(t, "creating intention")
_, _, err := client.ConfigEntries().Set(&api.ServiceIntentionsConfigEntry{
Kind: api.ServiceIntentions,
Name: "static-server",
Sources: []*api.SourceIntention{
{
Name: "gateway",
Action: api.IntentionActionAllow,
},
},
}, nil)
require.NoError(t, err)
defer func() {
_, err := client.ConfigEntries().Delete(api.ServiceIntentions, "static-server", &api.WriteOptions{})
require.NoError(t, err)
}()

logger.Log(t, "checking that connection is successful")
k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), StaticClientName, targetAddress)
}
19 changes: 19 additions & 0 deletions control-plane/api-gateway/cache/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/hashicorp/consul-k8s/control-plane/api-gateway/common"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/namespaces"
"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -60,6 +61,7 @@ type Config struct {
ConsulClientConfig *consul.Config
ConsulServerConnMgr consul.ServerConnectionManager
NamespacesEnabled bool
Datacenter string
CrossNamespaceACLPolicy string
Logger logr.Logger
}
Expand All @@ -83,6 +85,8 @@ type Cache struct {
synced chan struct{}

kinds []string

datacenter string
}

func New(config Config) *Cache {
Expand All @@ -104,6 +108,7 @@ func New(config Config) *Cache {
synced: make(chan struct{}, len(Kinds)),
logger: config.Logger,
crossNamespaceACLPolicy: config.CrossNamespaceACLPolicy,
datacenter: config.Datacenter,
}
}

Expand Down Expand Up @@ -216,6 +221,19 @@ func (c *Cache) updateAndNotify(ctx context.Context, once *sync.Once, kind strin
cache := common.NewReferenceMap()

for _, entry := range entries {
meta := entry.GetMeta()
if meta[constants.MetaKeyKubeName] == "" || meta[constants.MetaKeyDatacenter] != c.datacenter {
// Don't process things that don't belong to us. The main reason
// for this is so that we don't garbage collect config entries that
// are either user-created or that another controller running in a
// federated datacenter creates. While we still allow for competing controllers
// syncing/overriding each other due to conflicting Kubernetes objects in
// two federated clusters (which is what the rest of the controllers also allow
// for), we don't want to delete a config entry just because we don't have
// its corresponding Kubernetes object if we know it belongs to another datacenter.
continue
}

cache.Set(common.EntryToReference(entry), entry)
}

Expand Down Expand Up @@ -336,6 +354,7 @@ func (c *Cache) ensureRole(client *api.Client) (string, error) {
}

aclRoleName := "managed-gateway-acl-role"

aclRole, _, err := client.ACL().RoleReadByName(aclRoleName, &api.QueryOptions{})
if err != nil {
return "", err
Expand Down
Loading