Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.31] Rework loadbalancer server selection logic #11457

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,24 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy
return disabled
}

// APIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// WaitForAPIServers returns a list of apiserver endpoints, suitable for seeding client loadbalancer configurations.
// This function will block until it can return a populated list of apiservers, or if the remote server returns
// an error (indicating that it does not support this functionality).
func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
func WaitForAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []string {
var addresses []string
var info *clientaccess.Info
var err error

_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy)
if info == nil {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err = clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
logrus.Warnf("Failed to validate server token: %v", err)
return false, nil
}
}
addresses, err = GetAPIServers(ctx, info)
if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)
return false, err
Expand Down Expand Up @@ -760,14 +769,8 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
return nodeConfig, nil
}

// getAPIServers attempts to return a list of apiservers from the server.
func getAPIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) ([]string, error) {
withCert := clientaccess.WithClientCertificate(node.AgentConfig.ClientKubeletCert, node.AgentConfig.ClientKubeletKey)
info, err := clientaccess.ParseAndValidateToken(proxy.SupervisorURL(), node.Token, withCert)
if err != nil {
return nil, err
}

// GetAPIServers attempts to return a list of apiservers from the server.
func GetAPIServers(ctx context.Context, info *clientaccess.Info) ([]string, error) {
data, err := info.Get("/v1-" + version.Program + "/apiservers")
if err != nil {
return nil, err
Expand Down
31 changes: 19 additions & 12 deletions pkg/agent/loadbalancer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,36 @@ import (
"github.com/k3s-io/k3s/pkg/agent/util"
)

// lbConfig stores loadbalancer state that should be persisted across restarts.
type lbConfig struct {
ServerURL string `json:"ServerURL"`
ServerAddresses []string `json:"ServerAddresses"`
}

func (lb *LoadBalancer) writeConfig() error {
configOut, err := json.MarshalIndent(lb, "", " ")
config := &lbConfig{
ServerURL: lb.scheme + "://" + lb.servers.getDefaultAddress(),
ServerAddresses: lb.servers.getAddresses(),
}
configOut, err := json.MarshalIndent(config, "", " ")
if err != nil {
return err
}
return util.WriteFile(lb.configFile, string(configOut))
}

func (lb *LoadBalancer) updateConfig() error {
writeConfig := true
if configBytes, err := os.ReadFile(lb.configFile); err == nil {
config := &LoadBalancer{}
config := &lbConfig{}
if err := json.Unmarshal(configBytes, config); err == nil {
if config.ServerURL == lb.ServerURL {
writeConfig = false
lb.setServers(config.ServerAddresses)
// if the default server from the config matches our current default,
// load the rest of the addresses as well.
if config.ServerURL == lb.scheme+"://"+lb.servers.getDefaultAddress() {
lb.Update(config.ServerAddresses)
return nil
}
}
}
if writeConfig {
if err := lb.writeConfig(); err != nil {
return err
}
}
return nil
// config didn't exist or used a different default server, write the current config to disk.
return lb.writeConfig()
}
70 changes: 70 additions & 0 deletions pkg/agent/loadbalancer/httpproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package loadbalancer

import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"time"

"github.com/k3s-io/k3s/pkg/version"
http_dialer "github.com/mwitkow/go-http-dialer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/http/httpproxy"
"golang.org/x/net/proxy"
)

var defaultDialer proxy.Dialer = &net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}

// SetHTTPProxy configures a proxy-enabled dialer to be used for all loadbalancer connections,
// if the agent has been configured to allow use of a HTTP proxy, and the environment has been configured
// to indicate use of a HTTP proxy for the server URL.
func SetHTTPProxy(address string) error {
// Check if env variable for proxy is set
if useProxy, _ := strconv.ParseBool(os.Getenv(version.ProgramUpper + "_AGENT_HTTP_PROXY_ALLOWED")); !useProxy || address == "" {
return nil
}

serverURL, err := url.Parse(address)
if err != nil {
return errors.Wrapf(err, "failed to parse address %s", address)
}

// Call this directly instead of using the cached environment used by http.ProxyFromEnvironment to allow for testing
proxyFromEnvironment := httpproxy.FromEnvironment().ProxyFunc()
proxyURL, err := proxyFromEnvironment(serverURL)
if err != nil {
return errors.Wrapf(err, "failed to get proxy for address %s", address)
}
if proxyURL == nil {
logrus.Debug(version.ProgramUpper + "_AGENT_HTTP_PROXY_ALLOWED is true but no proxy is configured for URL " + serverURL.String())
return nil
}

dialer, err := proxyDialer(proxyURL, defaultDialer)
if err != nil {
return errors.Wrapf(err, "failed to create proxy dialer for %s", proxyURL)
}

defaultDialer = dialer
logrus.Debugf("Using proxy %s for agent connection to %s", proxyURL, serverURL)
return nil
}

// proxyDialer creates a new proxy.Dialer that routes connections through the specified proxy.
func proxyDialer(proxyURL *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
if proxyURL.Scheme == "http" || proxyURL.Scheme == "https" {
// Create a new HTTP proxy dialer
httpProxyDialer := http_dialer.New(proxyURL, http_dialer.WithConnectionTimeout(10*time.Second), http_dialer.WithDialer(forward.(*net.Dialer)))
return httpProxyDialer, nil
} else if proxyURL.Scheme == "socks5" {
// For SOCKS5 proxies, use the proxy package's FromURL
return proxy.FromURL(proxyURL, forward)
}
return nil, fmt.Errorf("unsupported proxy scheme: %s", proxyURL.Scheme)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package loadbalancer

import (
"fmt"
"net"
"os"
"strings"
"testing"

"github.com/k3s-io/k3s/pkg/version"
"github.com/sirupsen/logrus"
"golang.org/x/net/proxy"
)

var originalDialer proxy.Dialer
var defaultEnv map[string]string
var proxyEnvs = []string{version.ProgramUpper + "_AGENT_HTTP_PROXY_ALLOWED", "HTTP_PROXY", "HTTPS_PROXY", "NO_PROXY", "http_proxy", "https_proxy", "no_proxy"}

Expand All @@ -19,7 +20,7 @@ func init() {
}

func prepareEnv(env ...string) {
defaultDialer = &net.Dialer{}
originalDialer = defaultDialer
defaultEnv = map[string]string{}
for _, e := range proxyEnvs {
if v, ok := os.LookupEnv(e); ok {
Expand All @@ -34,6 +35,7 @@ func prepareEnv(env ...string) {
}

func restoreEnv() {
defaultDialer = originalDialer
for _, e := range proxyEnvs {
if v, ok := defaultEnv[e]; ok {
os.Setenv(e, v)
Expand Down
Loading
Loading