Skip to content

Commit

Permalink
Add containerd ready channel to delay etcd node join
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 88178ae)
  • Loading branch information
brandond committed Oct 20, 2021
1 parent b65bcdf commit 3988ede
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 67 deletions.
8 changes: 8 additions & 0 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
}
}

// the agent runtime is ready to host workloads when containerd is up and the airgap
// images have finished loading, as that portion of startup may block for an arbitrary
// amount of time depending on how long it takes to import whatever the user has placed
// in the images directory.
if cfg.AgentReady != nil {
close(cfg.AgentReady)
}

notifySocket := os.Getenv("NOTIFY_SOCKET")
os.Unsetenv("NOTIFY_SOCKET")

Expand Down
10 changes: 9 additions & 1 deletion pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
return err
}

// Do an immediate fill of proxy addresses from the server endpoint list, before going into the
// watch loop. This will fail on the first server, as the apiserver won't be started yet - but
// that's fine because the local server is already seeded into the proxy address list.
endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if endpoint != nil {
addresses := util.GetAddresses(endpoint)
Expand All @@ -61,16 +64,21 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
}
}

// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
disconnect := map[string]context.CancelFunc{}

wg := &sync.WaitGroup{}
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
disconnect[address] = connect(ctx, wg, address, tlsConfig)
}
}

// Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster. We go into a faster but noisier connect loop if the watch fails
// following a successful connection.
go func() {
util.WaitForAPIServerReady(client, 30*time.Second)
connect:
for {
time.Sleep(5 * time.Second)
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/cmds/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Agent struct {
Taints cli.StringSlice
ImageCredProvBinDir string
ImageCredProvConfig string
AgentReady chan<- struct{}
AgentShared
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/rancher/k3s/pkg/agent/loadbalancer"
"github.com/rancher/k3s/pkg/cli/cmds"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/datadir"
"github.com/rancher/k3s/pkg/etcd"
"github.com/rancher/k3s/pkg/netutil"
Expand Down Expand Up @@ -83,8 +84,11 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
cfg.Token = cfg.ClusterSecret
}

agentReady := make(chan struct{})

serverConfig := server.Config{}
serverConfig.DisableAgent = cfg.DisableAgent
serverConfig.ControlConfig.Runtime = &config.ControlRuntime{AgentReady: agentReady}
serverConfig.ControlConfig.Token = cfg.Token
serverConfig.ControlConfig.AgentToken = cfg.AgentToken
serverConfig.ControlConfig.JoinURL = cfg.ServerURL
Expand Down Expand Up @@ -415,6 +419,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
}()

if cfg.DisableAgent {
close(agentReady)
<-ctx.Done()
return nil
}
Expand All @@ -431,6 +436,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
}

agentConfig := cmds.AgentConfig
agentConfig.AgentReady = agentReady
agentConfig.Debug = app.GlobalBool("debug")
agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir)
agentConfig.ServerURL = url
Expand Down
2 changes: 2 additions & 0 deletions pkg/daemons/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ type ControlRuntime struct {

HTTPBootstrap bool
APIServerReady <-chan struct{}
AgentReady <-chan struct{}
ETCDReady <-chan struct{}
ClusterControllerStart func(ctx context.Context) error
LeaderElectedClusterControllerStart func(ctx context.Context) error
Expand All @@ -219,6 +220,7 @@ type ControlRuntime struct {
ServingKubeletKey string
ServerToken string
AgentToken string
APIServer http.Handler
Handler http.Handler
Tunnel http.Handler
Authenticator authenticator.Request
Expand Down
37 changes: 19 additions & 18 deletions pkg/daemons/control/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
Expand All @@ -22,7 +21,6 @@ import (
authorizationv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/client-go/kubernetes"
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -37,9 +35,7 @@ var localhostIP = net.ParseIP("127.0.0.1")

func Server(ctx context.Context, cfg *config.Control) error {
rand.Seed(time.Now().UTC().UnixNano())

runtime := &config.ControlRuntime{}
cfg.Runtime = runtime
runtime := cfg.Runtime

if err := prepare(ctx, cfg, runtime); err != nil {
return errors.Wrap(err, "preparing server")
Expand All @@ -48,27 +44,23 @@ func Server(ctx context.Context, cfg *config.Control) error {
cfg.Runtime.Tunnel = setupTunnel()
proxyutil.DisableProxyHostnameCheck = true

var auth authenticator.Request
var handler http.Handler
var err error
basicAuth, err := basicAuthenticator(runtime.PasswdFile)
if err != nil {
return err
}
runtime.Authenticator = basicAuth

if !cfg.DisableAPIServer {
auth, handler, err = apiServer(ctx, cfg, runtime)
if err != nil {
go waitForAPIServerHandlers(ctx, runtime)

if err := apiServer(ctx, cfg, runtime); err != nil {
return err
}

if err := waitForAPIServerInBackground(ctx, runtime); err != nil {
return err
}
}
basicAuth, err := basicAuthenticator(runtime.PasswdFile)
if err != nil {
return err
}

runtime.Authenticator = combineAuthenticators(basicAuth, auth)
runtime.Handler = handler

if !cfg.DisableScheduler {
if err := scheduler(ctx, cfg, runtime); err != nil {
Expand Down Expand Up @@ -145,7 +137,7 @@ func scheduler(ctx context.Context, cfg *config.Control, runtime *config.Control
return executor.Scheduler(ctx, runtime.APIServerReady, args)
}

func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) {
func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"feature-gates": "JobTrackingWithFinalizers=true",
}
Expand Down Expand Up @@ -381,6 +373,15 @@ func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.Cont
return err
}

func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntime) {
auth, handler, err := executor.APIServerHandlers(ctx)
if err != nil {
logrus.Fatalf("Failed to get request handlers from apiserver: %v", err)
}
runtime.Authenticator = combineAuthenticators(runtime.Authenticator, auth)
runtime.APIServer = handler
}

func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions pkg/daemons/executor/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ func (Embedded) KubeProxy(ctx context.Context, args []string) error {
return nil
}

func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) {
<-etcdReady
func (Embedded) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
}

func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)

go func() {
<-etcdReady
defer func() {
if err := recover(); err != nil {
logrus.Fatalf("apiserver panic: %v", err)
Expand All @@ -84,8 +89,7 @@ func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args [
logrus.Fatalf("apiserver exited: %v", command.ExecuteContext(ctx))
}()

startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
return nil
}

func (Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemons/executor/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (e Embedded) ETCD(ctx context.Context, args ETCDConfig) error {
}
etcd, err := embed.StartEtcd(cfg)
if err != nil {
return nil
return err
}

go func() {
Expand Down
9 changes: 7 additions & 2 deletions pkg/daemons/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type Executor interface {
Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error
Kubelet(ctx context.Context, args []string) error
KubeProxy(ctx context.Context, args []string) error
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error)
APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error)
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error
Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error
ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error
CurrentETCDOptions() (InitialOptions, error)
Expand Down Expand Up @@ -97,7 +98,11 @@ func KubeProxy(ctx context.Context, args []string) error {
return executor.KubeProxy(ctx, args)
}

func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) {
func APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
return executor.APIServerHandlers(ctx)
}

func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
return executor.APIServer(ctx, etcdReady, args)
}

Expand Down
31 changes: 20 additions & 11 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool,
func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
// Wait for etcd to come up as a new single-node cluster, then exit
go func() {
<-e.runtime.AgentReady
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
Expand Down Expand Up @@ -291,8 +292,14 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
return e.newCluster(ctx, false)
}

err = e.join(ctx, clientAccessInfo)
return errors.Wrap(err, "joining etcd cluster")
go func() {
<-e.runtime.AgentReady
if err := e.join(ctx, clientAccessInfo); err != nil {
logrus.Fatalf("ETCD join failed: %v", err)
}
}()

return nil
}

// join attempts to add a member to an existing cluster
Expand Down Expand Up @@ -335,9 +342,9 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
// make sure to remove the name file if a duplicate node name is used
nameFile := nameFile(e.config)
if err := os.Remove(nameFile); err != nil {
return err
logrus.Errorf("Failed to remove etcd name file %s: %v", nameFile, err)
}
return errors.New("Failed to join etcd cluster due to duplicate node names, please use unique node name for the server")
return errors.New("duplicate node name found, please use a unique name for this node")
}
for _, peer := range member.PeerURLs {
u, err := url.Parse(peer)
Expand All @@ -358,7 +365,7 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
}

if add {
logrus.Infof("Adding %s to etcd cluster %v", e.peerURL(), cluster)
logrus.Infof("Adding member %s=%s to etcd cluster %v", e.name, e.peerURL(), cluster)
if _, err = client.MemberAddAsLearner(clientCtx, []string{e.peerURL()}); err != nil {
return err
}
Expand Down Expand Up @@ -444,7 +451,7 @@ func (e *ETCD) handler(next http.Handler) http.Handler {
return mux
}

// infoHandler returns etcd cluster information. This is used by new members when joining the custer.
// infoHandler returns etcd cluster information. This is used by new members when joining the cluster.
func (e *ETCD) infoHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second)
Expand Down Expand Up @@ -500,6 +507,10 @@ func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoi
// toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable
// for use by etcd.
func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
if runtime.ClientETCDCert == "" || runtime.ClientETCDKey == "" || runtime.ETCDServerCA == "" {
return nil, errors.New("runtime is not ready yet")
}

clientCert, err := tls.LoadX509KeyPair(runtime.ClientETCDCert, runtime.ClientETCDKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -533,8 +544,8 @@ func GetAdvertiseAddress(advertiseIP string) (string, error) {
// newCluster returns options to set up etcd for a new cluster
func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
err := e.cluster(ctx, reset, executor.InitialOptions{
AdvertisePeerURL: fmt.Sprintf("https://%s:2380", e.address),
Cluster: fmt.Sprintf("%s=https://%s:2380", e.name, e.address),
AdvertisePeerURL: e.peerURL(),
Cluster: fmt.Sprintf("%s=%s", e.name, e.peerURL()),
State: "new",
})
if err != nil {
Expand Down Expand Up @@ -684,6 +695,7 @@ func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRe
// being promoted to full voting member. The checks only run on the cluster member that is
// the etcd leader.
func (e *ETCD) manageLearners(ctx context.Context) error {
<-e.runtime.AgentReady
t := time.NewTicker(manageTickerTime)
defer t.Stop()

Expand Down Expand Up @@ -1377,9 +1389,6 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
// GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd
// when it succeed it will parse the first address in the list and return back an address
func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) {
if cfg.Runtime == nil {
return "", fmt.Errorf("runtime is not ready yet")
}
cl, err := GetClient(ctx, cfg.Runtime, endpoint)
if err != nil {
return "", err
Expand Down
Loading

0 comments on commit 3988ede

Please sign in to comment.