Skip to content

Commit

Permalink
Multiport (#1012)
Browse files Browse the repository at this point in the history
Support a workaround for multi port pods by registering a Consul service per port, and by injecting init containers and envoy sidecars per port. Does not work for services with transparent proxy, metrics, or metrics merging.

- Acceptance test for multiport
  - The multiport app is just http-echo listening on different ports in 2 containers
  - First, the test deploys `static-client` and `multiport`, with static-client having `multiport` and `multiport-admin` as upstreams. It checks that `static-client` can make connections to each service-- `multiport`, and `multiport-admin`. This is to test inbound connections for a multiport app.
  - Then the test deploys `static-server`, and checks that `multiport` can make a connection to `static-server`. This is to test outbound connections from a multiport app. Note that there is only 1 intention because all upstream connections are configured and go through the 1st service's envoy proxy.
  
- Implementation for multiport
  - `handler`
    - uses the service name annotation to figure out if this is a multiport pod or not
    - inject a connect-init container and an envoy-sidecar container per port/service
    - mount all service account tokens not already on the pod. For any service whose service account is not already attached to the pod, this mounts a volume for that service account token. In container-init, these volumes are added as a container volumeMount so the service account token file can be passed to the connect-init command via `-bearer-token-file`
  - `container_init` -- update the template to pass in arguments specific to the multiport service
  - `envoy_sidecar` -- allow the bootstrap file path to have the service name in it for multiport services, and add `--base-id`  since there could be multiple envoy sidecars. 
  - `endpoints_controller`: the listener port should be 20000 for the first service, 20001 for the second, etc. Also only configures upstreams on first service port in a multiport pod.
  - `connect_init` command: when its a multiport app, add a filter to poll by service name, use the service specific bearertoken, acl-token-sink, and proxyid files
  • Loading branch information
ndhanushkodi authored Feb 22, 2022
1 parent ae206c9 commit 06ab056
Show file tree
Hide file tree
Showing 33 changed files with 1,637 additions and 204 deletions.
44 changes: 28 additions & 16 deletions acceptance/framework/k8s/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
)

const staticClientName = "static-client"

// Deploy creates a Kubernetes deployment by applying configuration stored at filepath,
// sets up a cleanup function and waits for the deployment to become available.
func Deploy(t *testing.T, options *k8s.KubectlOptions, noCleanupOnFailure bool, debugDirectory string, filepath string) {
Expand Down Expand Up @@ -71,38 +69,43 @@ func DeployKustomize(t *testing.T, options *k8s.KubectlOptions, noCleanupOnFailu
RunKubectl(t, options, "wait", "--for=condition=available", "--timeout=5m", fmt.Sprintf("deploy/%s", deployment.Name))
}

// CheckStaticServerConnection execs into a pod of the deployment given by deploymentName
// CheckStaticServerConnection execs into a pod of sourceApp
// and runs a curl command with the provided curlArgs.
// This function assumes that the connection is made to the static-server and expects the output
// to be "hello world" in a case of success.
// to be "hello world" by default, or expectedSuccessOutput in a case of success.
// If expectSuccess is true, it will expect connection to succeed,
// otherwise it will expect failure due to intentions.
func CheckStaticServerConnection(t *testing.T, options *k8s.KubectlOptions, expectSuccess bool, failureMessages []string, curlArgs ...string) {
func CheckStaticServerConnection(t *testing.T, options *k8s.KubectlOptions, sourceApp string, expectSuccess bool, failureMessages []string, expectedSuccessOutput string, curlArgs ...string) {
t.Helper()

CheckStaticServerConnectionMultipleFailureMessages(t, options, expectSuccess, failureMessages, curlArgs...)
CheckStaticServerConnectionMultipleFailureMessages(t, options, sourceApp, expectSuccess, failureMessages, expectedSuccessOutput, curlArgs...)
}

// CheckStaticServerConnectionMultipleFailureMessages execs into a pod of the deployment given by deploymentName
// CheckStaticServerConnectionMultipleFailureMessages execs into a pod of sourceApp
// and runs a curl command with the provided curlArgs.
// This function assumes that the connection is made to the static-server and expects the output
// to be "hello world" in a case of success.
// to be "hello world" by default, or expectedSuccessOutput in a case of success.
// If expectSuccess is true, it will expect connection to succeed,
// otherwise it will expect failure due to intentions. If multiple failureMessages are provided it will assert
// on the existence of any of them.
func CheckStaticServerConnectionMultipleFailureMessages(t *testing.T, options *k8s.KubectlOptions, expectSuccess bool, failureMessages []string, curlArgs ...string) {
func CheckStaticServerConnectionMultipleFailureMessages(t *testing.T, options *k8s.KubectlOptions, sourceApp string, expectSuccess bool, failureMessages []string, expectedSuccessOutput string, curlArgs ...string) {
t.Helper()

expectedOutput := "hello world"
if expectedSuccessOutput != "" {
expectedOutput = expectedSuccessOutput
}

retrier := &retry.Timer{Timeout: 80 * time.Second, Wait: 2 * time.Second}

args := []string{"exec", "deploy/" + staticClientName, "-c", staticClientName, "--", "curl", "-vvvsSf"}
args := []string{"exec", "deploy/" + sourceApp, "-c", sourceApp, "--", "curl", "-vvvsSf"}
args = append(args, curlArgs...)

retry.RunWith(retrier, t, func(r *retry.R) {
output, err := RunKubectlAndGetOutputE(t, options, args...)
if expectSuccess {
require.NoError(r, err)
require.Contains(r, output, "hello world")
require.Contains(r, output, expectedOutput)
} else {
require.Error(r, err)
require.Condition(r, func() bool {
Expand All @@ -118,24 +121,33 @@ func CheckStaticServerConnectionMultipleFailureMessages(t *testing.T, options *k
})
}

// CheckStaticServerConnectionSuccessfulWithMessage is just like CheckStaticServerConnectionSuccessful
// but it asserts on a non-default expected message.
func CheckStaticServerConnectionSuccessfulWithMessage(t *testing.T, options *k8s.KubectlOptions, sourceApp string, message string, curlArgs ...string) {
t.Helper()
start := time.Now()
CheckStaticServerConnectionMultipleFailureMessages(t, options, sourceApp, true, nil, message, curlArgs...)
logger.Logf(t, "Took %s to check if static server connection was successful", time.Since(start))
}

// CheckStaticServerConnectionSuccessful is just like CheckStaticServerConnection
// but it always expects a successful connection.
func CheckStaticServerConnectionSuccessful(t *testing.T, options *k8s.KubectlOptions, curlArgs ...string) {
func CheckStaticServerConnectionSuccessful(t *testing.T, options *k8s.KubectlOptions, sourceApp string, curlArgs ...string) {
t.Helper()
start := time.Now()
CheckStaticServerConnection(t, options, true, nil, curlArgs...)
CheckStaticServerConnection(t, options, sourceApp, true, nil, "", curlArgs...)
logger.Logf(t, "Took %s to check if static server connection was successful", time.Since(start))
}

// CheckStaticServerConnectionFailing is just like CheckStaticServerConnection
// but it always expects a failing connection with various errors.
func CheckStaticServerConnectionFailing(t *testing.T, options *k8s.KubectlOptions, curlArgs ...string) {
func CheckStaticServerConnectionFailing(t *testing.T, options *k8s.KubectlOptions, sourceApp string, curlArgs ...string) {
t.Helper()
CheckStaticServerConnection(t, options, false, []string{
CheckStaticServerConnection(t, options, sourceApp, false, []string{
"curl: (52) Empty reply from server",
"curl: (7) Failed to connect",
"curl: (56) Recv failure: Connection reset by peer",
}, curlArgs...)
}, "", curlArgs...)
}

// labelMapToString takes a label map[string]string
Expand Down
12 changes: 6 additions & 6 deletions acceptance/tests/connect/connect_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func ConnectInjectConnectivityCheck(t *testing.T, ctx environment.TestContext, c
if secure {
logger.Log(t, "checking that the connection is not successful because there's no intention")
if cfg.EnableTransparentProxy {
k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), "http://static-server")
k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), staticClientName, "http://static-server")
} else {
k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), "http://localhost:1234")
k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234")
}

logger.Log(t, "creating intention")
Expand All @@ -103,9 +103,9 @@ func ConnectInjectConnectivityCheck(t *testing.T, ctx environment.TestContext, c
logger.Log(t, "checking that connection is successful")
if cfg.EnableTransparentProxy {
// todo: add an assertion that the traffic is going through the proxy
k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://static-server")
k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://static-server")
} else {
k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://localhost:1234")
k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234")
}

// Test that kubernetes readiness status is synced to Consul.
Expand All @@ -120,8 +120,8 @@ func ConnectInjectConnectivityCheck(t *testing.T, ctx environment.TestContext, c
// from server, which is the case when a connection is unsuccessful due to intentions in other tests.
logger.Log(t, "checking that connection is unsuccessful")
if cfg.EnableTransparentProxy {
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server port 80: Connection refused"}, "http://static-server")
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server port 80: Connection refused"}, "", "http://static-server")
} else {
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234")
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234")
}
}
12 changes: 6 additions & 6 deletions acceptance/tests/connect/connect_inject_namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func TestConnectInjectNamespaces(t *testing.T) {
if c.secure {
logger.Log(t, "checking that the connection is not successful because there's no intention")
if cfg.EnableTransparentProxy {
k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace))
k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace))
} else {
k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, "http://localhost:1234")
k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, staticClientName, "http://localhost:1234")
}

intention := &api.ServiceIntentionsConfigEntry{
Expand Down Expand Up @@ -209,9 +209,9 @@ func TestConnectInjectNamespaces(t *testing.T) {

logger.Log(t, "checking that connection is successful")
if cfg.EnableTransparentProxy {
k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace))
k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace))
} else {
k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, "http://localhost:1234")
k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, staticClientName, "http://localhost:1234")
}

// Test that kubernetes readiness status is synced to Consul.
Expand All @@ -226,9 +226,9 @@ func TestConnectInjectNamespaces(t *testing.T) {
// from server, which is the case when a connection is unsuccessful due to intentions in other tests.
logger.Log(t, "checking that connection is unsuccessful")
if cfg.EnableTransparentProxy {
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.%s", staticServerNamespace))
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.%s", staticServerNamespace))
} else {
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234")
k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234")
}
})
}
Expand Down
Loading

0 comments on commit 06ab056

Please sign in to comment.