Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Topology updater: TM policy from configz endpoint #530

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ HOSTMOUNT_PREFIX ?= /

KUBECONFIG ?=
E2E_TEST_CONFIG ?=
PULL_IF_NOT_PRESENT ?=

LDFLAGS = -ldflags "-s -w -X sigs.k8s.io/node-feature-discovery/pkg/version.version=$(VERSION) -X sigs.k8s.io/node-feature-discovery/source.pathPrefix=$(HOSTMOUNT_PREFIX)"

Expand Down Expand Up @@ -131,10 +132,12 @@ test:
e2e-test:
@if [ -z ${KUBECONFIG} ]; then echo "[ERR] KUBECONFIG missing, must be defined"; exit 1; fi
$(GO_CMD) test -v ./test/e2e/ -args -nfd.repo=$(IMAGE_REPO) -nfd.tag=$(IMAGE_TAG_NAME) \
-kubeconfig=$(KUBECONFIG) -nfd.e2e-config=$(E2E_TEST_CONFIG) -ginkgo.focus="\[kubernetes-sigs\]" \
-kubeconfig=$(KUBECONFIG) -nfd.e2e-config=$(E2E_TEST_CONFIG) -nfd.pull-if-not-present=$(PULL_IF_NOT_PRESENT) \
-ginkgo.focus="\[kubernetes-sigs\]" \
$(if $(OPENSHIFT),-nfd.openshift,)
$(GO_CMD) test -v ./test/e2e/ -args -nfd.repo=$(IMAGE_REPO) -nfd.tag=$(IMAGE_TAG_NAME)-minimal \
-kubeconfig=$(KUBECONFIG) -nfd.e2e-config=$(E2E_TEST_CONFIG) -ginkgo.focus="\[kubernetes-sigs\]" \
-kubeconfig=$(KUBECONFIG) -nfd.e2e-config=$(E2E_TEST_CONFIG) -nfd.pull-if-not-present=$(PULL_IF_NOT_PRESENT) \
-ginkgo.focus="\[kubernetes-sigs\]" \
$(if $(OPENSHIFT),-nfd.openshift,)

push:
Expand Down
32 changes: 18 additions & 14 deletions cmd/nfd-topology-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
ProgramName = "nfd-topology-updater"
)

// Options to obtain Kubelet config file
const (
kubeletFile = "kubelet-config-file"
configz = "configz-endpoint"
)

func main() {
flags := flag.NewFlagSet(ProgramName, flag.ExitOnError)

Expand All @@ -58,12 +64,15 @@ func main() {
// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()

klConfig, err := kubeconf.GetKubeletConfigFromLocalFile(resourcemonitorArgs.KubeletConfigFile)
if err != nil {
klog.Fatalf("error reading kubelet config: %v", err)
}
tmPolicy := string(topologypolicy.DetectTopologyPolicy(klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope))
klog.Infof("detected kubelet Topology Manager policy %q", tmPolicy)
var tmPolicy string
if resourcemonitorArgs.KubeletConfigObtainOpt == kubeletFile {
klConfig, err := kubeconf.GetKubeletConfigFromLocalFile(resourcemonitorArgs.KubeletConfigFile)
if err != nil {
klog.Fatalf("error reading kubelet config: %v", err)
}
tmPolicy = string(topologypolicy.DetectTopologyPolicy(klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope))
klog.Infof("detected kubelet Topology Manager policy %q", tmPolicy)
} // Otherwise get TopologyManagerPolicy from configz-endpoint

// Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, tmPolicy)
Expand Down Expand Up @@ -109,22 +118,17 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
"Time to sleep between CR updates. Non-positive value implies no CR updatation (i.e. infinite sleep). [Default: 60s]")
flagset.StringVar(&resourcemonitorArgs.Namespace, "watch-namespace", "*",
"Namespace to watch pods (for testing/debugging purpose). Use * for all namespaces.")
flagset.StringVar(&resourcemonitorArgs.KubeletConfigObtainOpt, "obtain-kubelet-config", kubeletFile, "Options to obtain Kubelet config file. [Possible arguments: kubelet-config-file/configz-endpoint]")
flagset.StringVar(&resourcemonitorArgs.KubeletConfigFile, "kubelet-config-file", source.VarDir.Path("lib/kubelet/config.yaml"),
"Kubelet config file path.")
flagset.StringVar(&resourcemonitorArgs.PodResourceSocketPath, "podresources-socket", source.VarDir.Path("lib/kubelet/pod-resources/kubelet.sock"),
"Pod Resource Socket path to use.")
flagset.StringVar(&args.Server, "server", "localhost:8080",
"NFD server address to connecto to.")
"NFD server address to connect to.")
flagset.StringVar(&args.ServerNameOverride, "server-name-override", "",
"Hostname expected from server certificate, useful in testing")

initKlogFlags(flagset)
klog.InitFlags(flagset)

return args, resourcemonitorArgs
}

func initKlogFlags(flagset *flag.FlagSet) {
flags := flag.NewFlagSet("klog flags", flag.ExitOnError)
//flags.SetOutput(ioutil.Discard)
klog.InitFlags(flags)
}
10 changes: 10 additions & 0 deletions cmd/nfd-topology-updater/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestArgsParse(t *testing.T) {
So(args.Oneshot, ShouldBeTrue)
So(finderArgs.SleepInterval, ShouldEqual, 60*time.Second)
So(finderArgs.KubeletConfigFile, ShouldEqual, "/var/lib/kubelet/config.yaml")
So(finderArgs.KubeletConfigObtainOpt, ShouldEqual, kubeletFile)
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/var/lib/kubelet/pod-resources/kubelet.sock")
})
})
Expand Down Expand Up @@ -100,5 +101,14 @@ func TestArgsParse(t *testing.T) {
So(finderArgs.PodResourceSocketPath, ShouldEqual, "/path/testkubelet.sock")
})
})

Convey("When configz-endpoint specified", func() {
_, finderArgs := parseArgs(flags,
"--obtain-kubelet-config=configz-endpoint")

Convey("obtain-kubelet-config option should be configz-endpoint", func() {
So(finderArgs.KubeletConfigObtainOpt, ShouldEqual, configz)
})
})
})
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/klauspost/cpuid/v2 v2.0.9
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
github.com/pkg/errors v0.9.1
github.com/smartystreets/assertions v1.2.0
github.com/smartystreets/goconvey v1.6.4
github.com/stretchr/testify v1.7.0
Expand All @@ -20,12 +21,15 @@ require (
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.22.0
k8s.io/apiextensions-apiserver v0.22.0
k8s.io/apimachinery v0.22.0
k8s.io/client-go v0.22.0
k8s.io/klog/v2 v2.9.0
k8s.io/kubelet v0.0.0
k8s.io/kubernetes v1.22.0
k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9
sigs.k8s.io/yaml v1.2.0
)

Expand Down
6 changes: 5 additions & 1 deletion pkg/apihelper/apihelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
api "k8s.io/api/core/v1"
k8sclient "k8s.io/client-go/kubernetes"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)

// APIHelpers represents a set of API helpers for Kubernetes
Expand All @@ -47,6 +48,9 @@ type APIHelpers interface {
// GetTopologyClient returns a topologyclientset
GetTopologyClient() (*topologyclientset.Clientset, error)

// GetPod returns the Kubernetes pod in a namepace with a name.
// GetPod returns the Kubernetes pod in a namespace with a name.
GetPod(*k8sclient.Clientset, string, string) (*api.Pod, error)

// GetKubeletConfig returns node's kubelet config file
GetKubeletConfig(c *k8sclient.Clientset, nodeName string) (*kubeletconfig.KubeletConfiguration, error)
}
46 changes: 45 additions & 1 deletion pkg/apihelper/k8shelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ import (
"context"
"encoding/json"

topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
api "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8sclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletconfigscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme"

topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
"github.com/pkg/errors"
)

// Implements APIHelpers
Expand Down Expand Up @@ -132,3 +137,42 @@ func (h K8sHelpers) GetPod(cli *k8sclient.Clientset, namespace string, podName s

return pod, nil
}

func (h K8sHelpers) GetKubeletConfig(c *k8sclient.Clientset, nodeName string) (*kubeletconfig.KubeletConfiguration, error) {
//request here is the same as: curl https://$APISERVER:6443/api/v1/nodes/$NODE_NAME/proxy/configz
res := c.CoreV1().RESTClient().Get().Resource("nodes").Name(nodeName).SubResource("proxy").Suffix("configz").Do(context.TODO())
return decodeConfigz(&res)
}

// Decodes the rest result from /configz and returns a kubeletconfig.KubeletConfiguration (internal type).
func decodeConfigz(res *restclient.Result) (*kubeletconfig.KubeletConfiguration, error) {
// This hack because /configz reports the following structure:
// {"kubeletconfig": {the JSON representation of kubeletconfigv1beta1.KubeletConfiguration}}
type configzWrapper struct {
ComponentConfig kubeletconfigv1beta1.KubeletConfiguration `json:"kubeletconfig"`
}

configz := configzWrapper{}
kubeCfg := kubeletconfig.KubeletConfiguration{}

contentsBytes, err := res.Raw()
if err != nil {
return nil, err
}

err = json.Unmarshal(contentsBytes, &configz)
if err != nil {
return nil, errors.Wrap(err, string(contentsBytes))
}

scheme, _, err := kubeletconfigscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
err = scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil)
if err != nil {
return nil, err
}

return &kubeCfg, nil
}
24 changes: 24 additions & 0 deletions pkg/apihelper/mock_APIHelpers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,21 @@ func (m *nfdMaster) UpdateNodeTopology(c context.Context, r *topologypb.NodeTopo
} else {
klog.Infof("received CR updation request for node %q", r.NodeName)
}
if len(r.TopologyPolicies[0]) == 0 {
klog.Warningf("Using configz-endpoint in order to get Kubelet configuration, consider to be unstable")
cli, err := m.apihelper.GetClient()
if err != nil {
klog.Errorf("%v", err.Error())
return &topologypb.NodeTopologyResponse{}, err
}
kc, err := m.apihelper.GetKubeletConfig(cli, r.NodeName)
if err != nil {
klog.Errorf("failed to get Kubelet config: %v", err.Error())
return &topologypb.NodeTopologyResponse{}, err
}
r.TopologyPolicies[0] = kc.TopologyManagerPolicy
klog.Infof("detected topology policy: %q", kc.TopologyManagerPolicy)
}
if !m.args.NoPublish {
err := m.updateCR(r.NodeName, r.TopologyPolicies, r.Zones, m.args.NRTNamespace)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/resourcemonitor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (

// Args stores commandline arguments used for resource monitoring
type Args struct {
PodResourceSocketPath string
SleepInterval time.Duration
Namespace string
KubeletConfigFile string
PodResourceSocketPath string
SleepInterval time.Duration
Namespace string
KubeletConfigFile string
KubeletConfigObtainOpt string
}

// ResourceInfo stores information of resources and their corresponding IDs obtained from PodResource API
Expand Down
Loading