Skip to content

Commit

Permalink
enhancement: Upgrade to nodeutil (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
helayoty authored Aug 22, 2022
1 parent 86ea2a9 commit 93fa2ca
Show file tree
Hide file tree
Showing 15 changed files with 820 additions and 399 deletions.
27 changes: 27 additions & 0 deletions cmd/virtual-kubelet/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"context"
"os"
"os/signal"
)

func BaseContext(ctx context.Context) (context.Context, func()) {
sigC := make(chan os.Signal, 1)
ctx, cancel := context.WithCancel(ctx)

go func() {
for {
select {
case <-ctx.Done():
signal.Stop(sigC)
return
case <-sigC:
cancel()
}
}
}()

signal.Notify(sigC, cancelSigs()...)
return ctx, cancel
}
304 changes: 259 additions & 45 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,285 @@ package main

import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/mitchellh/go-homedir"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
azprovider "github.com/virtual-kubelet/azure-aci/provider"
cli "github.com/virtual-kubelet/node-cli"
logruscli "github.com/virtual-kubelet/node-cli/logrus"
opencensuscli "github.com/virtual-kubelet/node-cli/opencensus"
"github.com/virtual-kubelet/node-cli/opts"
"github.com/virtual-kubelet/node-cli/provider"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
v1 "k8s.io/api/core/v1"

"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace/opencensus"
"k8s.io/klog"
)

var (
buildVersion = "N/A"
buildTime = "N/A"
k8sVersion = "v1.19.10" // This should follow the version of k8s.io/client-go we are importing
numberOfWorkers = 50
buildVersion = "N/A"
k8sVersion = "v1.19.10" // This should follow the version of k8s.io/client-go we are importing
)

func main() {
ctx := cli.ContextWithCancelOnSignal(context.Background())
prog := filepath.Base(os.Args[0])
desc := prog + " implements a node on a Kubernetes cluster using Azure Container Instances to run pods."

var (
taintKey = envOrDefault("VKUBELET_TAINT_KEY", "virtual-kubelet.io/provider")
taintEffect = envOrDefault("VKUBELET_TAINT_EFFECT", string(v1.TaintEffectNoSchedule))
taintValue = envOrDefault("VKUBELET_TAINT_VALUE", "azure")

logLevel = "info"
traceSampleRate string

// for aci
kubeconfigPath = os.Getenv("KUBECONFIG")
cfgPath string
clusterDomain = "cluster.local"
startupTimeout time.Duration
disableTaint bool
operatingSystem = "Linux"
numberOfWorkers = 50
resync time.Duration

certPath = os.Getenv("APISERVER_CERT_LOCATION")
keyPath = os.Getenv("APISERVER_KEY_LOCATION")
clientCACert = os.Getenv("APISERVER_CA_CERT_LOCATION")
clientNoVerify bool

logger := logrus.StandardLogger()
log.L = logruslogger.FromLogrus(logrus.NewEntry(logger))
logConfig := &logruscli.Config{LogLevel: "info"}
webhookAuth bool
webhookAuthnCacheTTL time.Duration
webhookAuthzUnauthedCacheTTL time.Duration
webhookAuthzAuthedCacheTTL time.Duration
nodeName string
listenPort = 10250

// deprecated
namespace string
metricsAddr string
provider string
leases bool
)

if kubeconfigPath == "" {
home, _ := homedir.Dir()
if home != "" {
kubeconfigPath = filepath.Join(home, ".kube", "config")
}
}

trace.T = opencensus.Adapter{}
traceConfig := opencensuscli.Config{
AvailableExporters: map[string]opencensuscli.ExporterInitFunc{
"ocagent": initOCAgent,
withTaint := func(cfg *nodeutil.NodeConfig) error {
if disableTaint {
return nil
}

taint := v1.Taint{
Key: taintKey,
Value: taintValue,
}
switch taintEffect {
case "NoSchedule":
taint.Effect = v1.TaintEffectNoSchedule
case "NoExecute":
taint.Effect = v1.TaintEffectNoExecute
case "PreferNoSchedule":
taint.Effect = v1.TaintEffectPreferNoSchedule
default:
return errdefs.InvalidInputf("taint effect %q is not supported", taintEffect)
}
cfg.NodeSpec.Spec.Taints = append(cfg.NodeSpec.Spec.Taints, taint)
return nil
}
withVersion := func(cfg *nodeutil.NodeConfig) error {
cfg.NodeSpec.Status.NodeInfo.KubeletVersion = strings.Join([]string{k8sVersion, "vk-azure-aci", buildVersion}, "-")
return nil
}

withWebhookAuth := func(cfg *nodeutil.NodeConfig) error {
if !webhookAuth {
return nil
}
auth, err := nodeutil.WebhookAuth(cfg.Client, cfg.NodeSpec.Name, func(cfg *nodeutil.WebhookAuthConfig) error {
if webhookAuthnCacheTTL > 0 {
cfg.AuthnConfig.CacheTTL = webhookAuthnCacheTTL
}
if webhookAuthzAuthedCacheTTL > 0 {
cfg.AuthzConfig.AllowCacheTTL = webhookAuthzAuthedCacheTTL
}
if webhookAuthzUnauthedCacheTTL > 0 {
cfg.AuthzConfig.AllowCacheTTL = webhookAuthzUnauthedCacheTTL
}
return nil
})
if err != nil {
return err
}
cfg.TLSConfig.ClientAuth = tls.RequestClientCert
cfg.Handler = nodeutil.WithAuth(auth, cfg.Handler)
return nil
}

withCA := func(cfg *tls.Config) error {
if clientCACert == "" {
return nil
}
if err := nodeutil.WithCAFromPath(clientCACert)(cfg); err != nil {
return fmt.Errorf("error getting CA from path: %w", err)
}
if clientNoVerify {
cfg.ClientAuth = tls.NoClientCert
}
return nil
}
withClient := func(cfg *nodeutil.NodeConfig) error {
client, err := nodeutil.ClientsetFromEnv(kubeconfigPath)
if err != nil {
return err
}
return nodeutil.WithClient(client)(cfg)
}

run := func(ctx context.Context) error {
if err := configureTracing(nodeName, traceSampleRate); err != nil {
return err
}
node, err := nodeutil.NewNode(nodeName, func(cfg nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) {
if p := os.Getenv("KUBELET_PORT"); p != "" {
var err error
listenPort, err = strconv.Atoi(p)
if err != nil {
return nil, nil, err
}
}
p, err := azprovider.NewACIProvider(cfgPath, cfg, os.Getenv("VKUBELET_POD_IP"), int32(listenPort), clusterDomain)
return p, nil, err
},
withClient,
withTaint,
withVersion,
nodeutil.WithTLSConfig(nodeutil.WithKeyPairFromPath(certPath, keyPath), withCA),
withWebhookAuth,
func(cfg *nodeutil.NodeConfig) error {
cfg.InformerResyncPeriod = resync
cfg.NumWorkers = numberOfWorkers
cfg.HTTPListenAddr = fmt.Sprintf(":%d", listenPort)
return nil
},
)
if err != nil {
return err
}

go func() error {
err = node.Run(ctx)
if err != nil {
return fmt.Errorf("error running the node: %w", err)
}
return nil
}()

if err := node.WaitReady(ctx, startupTimeout); err != nil {
return fmt.Errorf("error waiting for node to be ready: %w", err)
}

<-node.Done()
return node.Err()
}

o, err := opts.FromEnv()
if err != nil {
log.G(ctx).Fatal(err)
cmd := &cobra.Command{
Use: prog,
Short: desc,
Long: desc,
Run: func(cmd *cobra.Command, args []string) {
logger := logrus.StandardLogger()
lvl, err := logrus.ParseLevel(logLevel)
if err != nil {
logrus.WithError(err).Fatal("Error parsing log level")
}
logger.SetLevel(lvl)

ctx := log.WithLogger(cmd.Context(), logruslogger.FromLogrus(logrus.NewEntry(logger)))

if err := run(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
log.G(ctx).Fatal(err)
}
log.G(ctx).Debug(err)
}
},
}
o.Provider = "azure"
o.Version = strings.Join([]string{k8sVersion, "vk-azure-aci", buildVersion}, "-")
o.PodSyncWorkers = numberOfWorkers

node, err := cli.New(ctx,
cli.WithBaseOpts(o),
cli.WithCLIVersion(buildVersion, buildTime),
cli.WithProvider("azure", func(cfg provider.InitConfig) (provider.Provider, error) {
return azprovider.NewACIProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem, cfg.InternalIP, cfg.DaemonPort, cfg.KubeClusterDomain)
}),
cli.WithPersistentFlags(logConfig.FlagSet()),
cli.WithPersistentPreRunCallback(func() error {
return logruscli.Configure(logConfig, logger)
}),
cli.WithPersistentFlags(traceConfig.FlagSet()),
cli.WithPersistentPreRunCallback(func() error {
return opencensuscli.Configure(ctx, &traceConfig, o)
}),
)

if err != nil {
log.G(ctx).Fatal(err)
flags := cmd.Flags()

klogFlags := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(klogFlags)
klogFlags.VisitAll(func(f *flag.Flag) {
f.Name = "klog." + f.Name
flags.AddGoFlag(f)
})

flags.StringVar(&nodeName, "nodename", nodeName, "kubernetes node name")
flags.StringVar(&cfgPath, "provider-config", cfgPath, "cloud provider configuration file")
flags.StringVar(&clusterDomain, "cluster-domain", clusterDomain, "kubernetes cluster-domain")
flag.DurationVar(&startupTimeout, "startup-timeout", startupTimeout, "How long to wait for the virtual-kubelet to start")
flags.BoolVar(&disableTaint, "disable-taint", disableTaint, "disable the node taint")
flag.StringVar(&operatingSystem, "os", operatingSystem, "Operating System (Linux/Windows)")
flags.IntVar(&numberOfWorkers, "pod-sync-workers", numberOfWorkers, `set the number of pod synchronization workers`)
flags.DurationVar(&resync, "full-resync-period", resync, "how often to perform a full resync of pods between kubernetes and the provider")

flags.StringVar(&clientCACert, "client-verify-ca", clientCACert, "CA cert to use to verify client requests")
flags.BoolVar(&clientNoVerify, "no-verify-clients", clientNoVerify, "Do not require client certificate validation")
flags.BoolVar(&webhookAuth, "authentication-token-webhook", webhookAuth, ""+
"Use the TokenReview API to determine authentication for bearer tokens.")
flags.DurationVar(&webhookAuthnCacheTTL, "authentication-token-webhook-cache-ttl", webhookAuthnCacheTTL,
"The duration to cache responses from the webhook token authenticator.")
flags.DurationVar(&webhookAuthzAuthedCacheTTL, "authorization-webhook-cache-authorized-ttl", webhookAuthzAuthedCacheTTL,
"The duration to cache 'authorized' responses from the webhook authorizer.")
flags.DurationVar(&webhookAuthzUnauthedCacheTTL, "authorization-webhook-cache-unauthorized-ttl", webhookAuthzUnauthedCacheTTL,
"The duration to cache 'unauthorized' responses from the webhook authorizer.")

flags.StringVar(&traceSampleRate, "trace-sample-rate", traceSampleRate, "set probability of tracing samples")

// deprecated flags
flags.StringVar(&namespace, "namespace", namespace, "set namespace to watch for pods")
flags.MarkDeprecated("namespace", "cannot set namespace, all namespaces watched")
flags.MarkHidden("namespace")
flags.StringVar(&metricsAddr, "metrics-addr", metricsAddr, "address to listen for metrics/stats requests")
flags.MarkDeprecated("metrics-addr", "metrics are only available on the main api port")
flags.MarkHidden("metrics-addr")
flags.StringVar(&provider, "provider", provider, "cloud provider")
flags.MarkDeprecated("provider", "only one provider is supported")
flags.MarkHidden("provider")
flags.BoolVar(&leases, "enable-node-lease", leases, "use node leases for heartbeats")
flags.MarkDeprecated("leases", "Leases are always enabled")
flags.MarkHidden("leases")
flags.StringVar(&taintKey, "taint", taintKey, "Set node taint key")
flags.MarkDeprecated("taint", "Taint key should now be configured using the VKUBELET_TAINT_KEY environment variable")

ctx, cancel := BaseContext(context.Background())
defer cancel()

if err := cmd.ExecuteContext(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
logrus.WithError(err).Fatal("Error running command")
}
}
}

if err := node.Run(ctx); err != nil {
log.G(ctx).Fatal(err)
func envOrDefault(key string, defaultValue string) string {
v, set := os.LookupEnv(key)
if set {
return v
}
return defaultValue
}
Loading

0 comments on commit 93fa2ca

Please sign in to comment.