Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add service registration polling to connect-init #452

Merged
merged 20 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions connect-inject/container_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,19 @@ EOF
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
{{- end}}
{{- if .AuthMethod }}
consul-k8s connect-init -acl-auth-method="{{ .AuthMethod }}" \
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
{{- if .AuthMethod }}
-acl-auth-method="{{ .AuthMethod }}" \
{{- if .ConsulNamespace }}
{{- if .NamespaceMirroringEnabled }}
{{- /* If namespace mirroring is enabled, the auth method is
defined in the default namespace */}}
-namespace="default" \
-namespace="default"
{{- else }}
-namespace="{{ .ConsulNamespace }}" \
-namespace="{{ .ConsulNamespace }}"
{{- end }}
{{- end }}
{{- end }}
-meta="pod=${POD_NAMESPACE}/${POD_NAME}"
{{- end }}

# Register the service. The HCL is stored in the volume so that
# the preStop hook can access it to deregister the service.
Expand Down
19 changes: 11 additions & 8 deletions connect-inject/container_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestHandlerContainerInit(t *testing.T) {
`/bin/sh -ec
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \

# Register the service. The HCL is stored in the volume so that
# the preStop hook can access it to deregister the service.
Expand Down Expand Up @@ -736,6 +737,7 @@ func TestHandlerContainerInit_namespacesEnabled(t *testing.T) {
`/bin/sh -ec
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \

# Register the service. The HCL is stored in the volume so that
# the preStop hook can access it to deregister the service.
Expand Down Expand Up @@ -808,6 +810,7 @@ EOF
`/bin/sh -ec
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \

# Register the service. The HCL is stored in the volume so that
# the preStop hook can access it to deregister the service.
Expand Down Expand Up @@ -881,9 +884,9 @@ EOF
`/bin/sh -ec
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -acl-auth-method="auth-method" \
-namespace="non-default" \
-meta="pod=${POD_NAMESPACE}/${POD_NAME}"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="non-default"

# Register the service. The HCL is stored in the volume so that
# the preStop hook can access it to deregister the service.
Expand Down Expand Up @@ -960,9 +963,9 @@ EOF
`/bin/sh -ec
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -acl-auth-method="auth-method" \
-namespace="default" \
-meta="pod=${POD_NAMESPACE}/${POD_NAME}"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="default"

# Register the service. The HCL is stored in the volume so that
# the preStop hook can access it to deregister the service.
Expand Down Expand Up @@ -1143,8 +1146,8 @@ func TestHandlerContainerInit_authMethod(t *testing.T) {
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, `
consul-k8s connect-init -acl-auth-method="release-name-consul-k8s-auth-method" \
-meta="pod=${POD_NAMESPACE}/${POD_NAME}"`)
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="release-name-consul-k8s-auth-method"`)
require.Contains(actual, `
/consul/connect-inject/consul services register \
-token-file="/consul/connect-inject/acl-token" \
Expand Down
2 changes: 1 addition & 1 deletion subcommand/common/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func GenerateServerCerts(t *testing.T) (string, string, string, func()) {
// name. It will remove the file once the test completes.
func WriteTempFile(t *testing.T, contents string) string {
t.Helper()
file, err := ioutil.TempFile("", contents)
file, err := ioutil.TempFile("", "testName")
require.NoError(t, err)
_, err = file.WriteString(contents)
require.NoError(t, err)
Expand Down
146 changes: 108 additions & 38 deletions subcommand/connect-init/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectinit
import (
"flag"
"fmt"
"io/ioutil"
"sync"
"time"

Expand All @@ -14,42 +15,61 @@ import (
"github.com/mitchellh/cli"
)

const bearerTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
const tokenSinkFile = "/consul/connect-inject/acl-token"
const numLoginRetries = 3
const (
defaultBearerTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
defaultTokenSinkFile = "/consul/connect-inject/acl-token"
defaultProxyIDFile = "/consul/connect-inject/proxyid"

// The number of times to attempt ACL Login.
numLoginRetries = 3
// The number of times to attempt to read this service (60s).
defaultServicePollingRetries = 60
)

type Command struct {
UI cli.Ui

flagACLAuthMethod string // Auth Method to use for ACLs, if enabled
flagMeta map[string]string // Flag for metadata to consul login.
flagBearerTokenFile string // Location of the bearer token.
flagTokenSinkFile string // Location to write the output token.
flagACLAuthMethod string // Auth Method to use for ACLs, if enabled.
flagPodName string // Pod name.
flagPodNamespace string // Pod namespace.
flagSkipServiceRegistrationPolling bool // Whether or not to skip service registration.

bearerTokenFile string // Location of the bearer token. Default is /var/run/secrets/kubernetes.io/serviceaccount/token.
tokenSinkFile string // Location to write the output token. Default is defaultTokenSinkFile.
proxyIDFile string // Location to write the output proxyID. Default is defaultProxyIDFile.
serviceRegistrationPollingAttempts uint64 // Number of times to poll for this service to be registered.

flagSet *flag.FlagSet
http *flags.HTTPFlags

consulClient *api.Client

once sync.Once
help string
}

func (c *Command) init() {
c.flagSet = flag.NewFlagSet("", flag.ContinueOnError)
c.flagSet.StringVar(&c.flagACLAuthMethod, "acl-auth-method", "",
"Name of the auth method to login to.")
c.flagSet.Var((*flags.FlagMapValue)(&c.flagMeta), "meta",
"Metadata to set on the token, formatted as key=value. This flag may be specified multiple "+
"times to set multiple meta fields.")
c.flagSet.StringVar(&c.flagBearerTokenFile, "bearer-token-file", bearerTokenFile,
"Path to a file containing a secret bearer token to use with this auth method. "+
"Default is /var/run/secrets/kubernetes.io/serviceaccount/token.")
c.flagSet.StringVar(&c.flagTokenSinkFile, "token-sink-file", tokenSinkFile,
"The most recent token's SecretID is kept up to date in this file. Default is /consul/connect-inject/acl-token.")
c.flagSet.StringVar(&c.flagACLAuthMethod, "acl-auth-method", "", "Name of the auth method to login to.")
c.flagSet.StringVar(&c.flagPodName, "pod-name", "", "Name of the pod.")
c.flagSet.StringVar(&c.flagPodNamespace, "pod-namespace", "", "Name of the pod namespace.")

c.http = &flags.HTTPFlags{}
// TODO: when the endpoints controller manages service registration this can be removed. For now it preserves back compatibility.
c.flagSet.BoolVar(&c.flagSkipServiceRegistrationPolling, "skip-service-registration-polling", true,
"Flag to preserve backward compatibility with service registration.")

if c.bearerTokenFile == "" {
c.bearerTokenFile = defaultBearerTokenFile
}
if c.tokenSinkFile == "" {
c.tokenSinkFile = defaultTokenSinkFile
}
if c.proxyIDFile == "" {
c.proxyIDFile = defaultProxyIDFile
}
if c.serviceRegistrationPollingAttempts == 0 {
c.serviceRegistrationPollingAttempts = defaultServicePollingRetries
}

c.http = &flags.HTTPFlags{}
flags.Merge(c.flagSet, c.http.Flags())
c.help = flags.Usage(help, c.flagSet)
}
Expand All @@ -60,40 +80,90 @@ func (c *Command) Run(args []string) int {
if err := c.flagSet.Parse(args); err != nil {
return 1
}

// Validate flags.
if c.flagACLAuthMethod == "" {
c.UI.Error("-acl-auth-method must be set")
if c.flagPodName == "" {
c.UI.Error("-pod-name must be set")
return 1
}
if c.flagMeta == nil {
c.UI.Error("-meta must be set")
if c.flagPodNamespace == "" {
c.UI.Error("-pod-namespace must be set")
return 1
}

// TODO: Add namespace support
if c.consulClient == nil {
cfg := api.DefaultConfig()
c.http.MergeOntoConfig(cfg)
c.consulClient, err = consul.NewClient(cfg)
cfg := api.DefaultConfig()
c.http.MergeOntoConfig(cfg)
consulClient, err := consul.NewClient(cfg)
if err != nil {
c.UI.Error(fmt.Sprintf("Unable to get client connection: %s", err))
return 1
}

// First do the ACL Login, if necessary.
if c.flagACLAuthMethod != "" {
// loginMeta is the default metadata that we pass to the consul login API.
loginMeta := map[string]string{"pod": fmt.Sprintf("%s/%s", c.flagPodNamespace, c.flagPodName)}
err = backoff.Retry(func() error {
kschoche marked this conversation as resolved.
Show resolved Hide resolved
err := common.ConsulLogin(consulClient, c.bearerTokenFile, c.flagACLAuthMethod, c.tokenSinkFile, loginMeta)
if err != nil {
c.UI.Error(fmt.Sprintf("Consul login failed; retrying: %s", err))
}
return err
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), numLoginRetries))
if err != nil {
c.UI.Error(fmt.Sprintf("Hit maximum retries for consul login: %s", err))
return 1
}
// Now update the client so that it will read the ACL token we just fetched.
cfg.TokenFile = c.tokenSinkFile
consulClient, err = consul.NewClient(cfg)
if err != nil {
c.UI.Error(fmt.Sprintf("Unable to get client connection: %s", err))
c.UI.Error(fmt.Sprintf("Unable to update client connection: %s", err))
return 1
}
c.UI.Info("Consul login complete")
}
if c.flagSkipServiceRegistrationPolling {
return 0
}

// Now wait for the service to be registered. Do this by querying the Agent for a service
// which maps to this pod+namespace.
var proxyID string
err = backoff.Retry(func() error {
err := common.ConsulLogin(c.consulClient, c.flagBearerTokenFile, c.flagACLAuthMethod, c.flagTokenSinkFile, c.flagMeta)
filter := fmt.Sprintf("Meta[\"pod-name\"] == %s and Meta[\"k8s-namespace\"] == %s", c.flagPodName, c.flagPodNamespace)
serviceList, err := consulClient.Agent().ServicesWithFilter(filter)
if err != nil {
c.UI.Error(fmt.Sprintf("Consul login failed; retrying: %s", err))
c.UI.Error(fmt.Sprintf("Unable to get Agent services: %s", err))
return err
}
// Wait for the service and the connect-proxy service to be registered.
if len(serviceList) != 2 {
c.UI.Info("Unable to find registered services; retrying")
return fmt.Errorf("did not find correct number of services: %d", len(serviceList))
}
for _, svc := range serviceList {
c.UI.Info(fmt.Sprintf("Registered service has been detected: %s", svc.Service))
if svc.Kind == api.ServiceKindConnectProxy {
// This is the proxy service ID.
proxyID = svc.ID
return nil
}
}
return err
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), numLoginRetries))
// In theory we can't reach this point unless we have 2 services registered against
// this pod and neither are the connect-proxy. We don't support this case anyway, but it
// is necessary to return from the function.
return fmt.Errorf("unable to find registered connect-proxy service")
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), c.serviceRegistrationPollingAttempts))
if err != nil {
c.UI.Error(fmt.Sprintf("Timed out waiting for service registration: %v", err))
return 1
}
// Write the proxy ID to the shared volume so `consul connect envoy` can use it for bootstrapping.
err = ioutil.WriteFile(c.proxyIDFile, []byte(proxyID), 0444)
if err != nil {
c.UI.Error(fmt.Sprintf("Hit maximum retries for consul login: %s", err))
c.UI.Error(fmt.Sprintf("Unable to write proxy ID to file: %s", err))
return 1
}
c.UI.Info("Consul login complete")
c.UI.Info("Connect initialization completed")
return 0
}

Expand Down
Loading