diff --git a/tests/cmd/fault-trigger/main.go b/tests/cmd/fault-trigger/main.go index 8474c676a5..aec0e9ff8a 100644 --- a/tests/cmd/fault-trigger/main.go +++ b/tests/cmd/fault-trigger/main.go @@ -20,6 +20,8 @@ import ( _ "net/http/pprof" "time" + "github.com/pingcap/tidb-operator/tests/pkg/util" + "github.com/golang/glog" "github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/api" "github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/manager" @@ -28,13 +30,17 @@ import ( ) var ( - port int - pprofPort int + port int + pprofPort int + kubeProxyImage string + hostnameOverride string ) func init() { flag.IntVar(&port, "port", 23332, "The port that the fault trigger's http service runs on (default 23332)") flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof's http service runs on (default 6060)") + flag.StringVar(&kubeProxyImage, "kube-proxy-image", "k8s.gcr.io/kube-proxy:v1.12.2", "The kube proxy image (default k8s.gcr.io/kube-proxy:v1.12.2)") + flag.StringVar(&hostnameOverride, "hostname-override", "", "If non-empty, will use this string as identification instead of the actual hostname") flag.Parse() } @@ -43,7 +49,15 @@ func main() { logs.InitLogs() defer logs.FlushLogs() - mgr := manager.NewManager() + mgr := manager.NewManager(kubeProxyImage) + hostname, err := util.GetHostname(hostnameOverride) + if err != nil { + glog.Fatalf("get hostname failed, err: %v", err) + } + err = mgr.UpdateKubeProxyDaemonset(hostname) + if err != nil { + glog.Fatalf("update kube-proxy daemonset failed, err: %v", err) + } server := api.NewServer(mgr, port) diff --git a/tests/pkg/fault-trigger/api/router.go b/tests/pkg/fault-trigger/api/router.go index bfeeb6916a..c60fe3800e 100644 --- a/tests/pkg/fault-trigger/api/router.go +++ b/tests/pkg/fault-trigger/api/router.go @@ -50,9 +50,9 @@ func (s *Server) newService() *restful.WebService { ws.Route(ws.POST(fmt.Sprintf("/%s/start", manager.KubeControllerManagerService)).To(s.startKubeControllerManager)) ws.Route(ws.POST(fmt.Sprintf("/%s/stop", manager.KubeControllerManagerService)).To(s.stopKubeControllerManager)) - // TODO: support kube-proxy - // ws.Route(ws.POST(fmt.Sprintf("/%s/start", manager.KubeProxyService)).To(s.startKubeProxy)) - // ws.Route(ws.POST(fmt.Sprintf("/%s/stop", manager.KubeProxyService)).To(s.stopKubeProxy)) + + ws.Route(ws.POST("/kube-proxy/{nodeName}/start").To(s.startKubeProxy)) + ws.Route(ws.POST("/kube-proxy/{nodeName}/stop").To(s.stopKubeProxy)) return ws } diff --git a/tests/pkg/fault-trigger/api/server.go b/tests/pkg/fault-trigger/api/server.go index 7726fc7f04..2bbcacdcf8 100644 --- a/tests/pkg/fault-trigger/api/server.go +++ b/tests/pkg/fault-trigger/api/server.go @@ -137,13 +137,33 @@ func (s *Server) stopKubeAPIServer(req *restful.Request, resp *restful.Response) s.action(req, resp, s.mgr.StopKubeAPIServer, "stopKubeAPIServer") } -// func (s *Server) startKubeProxy(req *restful.Request, resp *restful.Response) { -// s.action(req, resp, s.mgr.StartKubeProxy, "startKubeProxy") -// } -// -// func (s *Server) stopKubeProxy(req *restful.Request, resp *restful.Response) { -// s.action(req, resp, s.mgr.StopKubeProxy, "stopKubeProxy") -// } +func (s *Server) startKubeProxy(req *restful.Request, resp *restful.Response) { + res := newResponse("startKubeProxy") + nodeName := req.PathParameter("nodeName") + if len(nodeName) == 0 { + res.message(fmt.Sprintf("nodeName can't be empty")).statusCode(http.StatusBadRequest) + if err := resp.WriteEntity(res); err != nil { + glog.Errorf("failed to response, methods: startKubeProxy, error: %v", err) + } + return + } + + s.kubeProxyAction(req, resp, res, nodeName, s.mgr.StartKubeProxy, "startKubeProxy") +} + +func (s *Server) stopKubeProxy(req *restful.Request, resp *restful.Response) { + res := newResponse("stopKubeProxy") + nodeName := req.PathParameter("nodeName") + if len(nodeName) == 0 { + res.message(fmt.Sprintf("nodeName can't be empty")).statusCode(http.StatusBadRequest) + if err := resp.WriteEntity(res); err != nil { + glog.Errorf("failed to response, methods: stopKubeProxy, error: %v", err) + } + return + } + + s.kubeProxyAction(req, resp, res, nodeName, s.mgr.StopKubeProxy, "stopKubeProxy") +} func (s *Server) startKubeScheduler(req *restful.Request, resp *restful.Response) { s.action(req, resp, s.mgr.StartKubeScheduler, "startKubeScheduler") @@ -209,6 +229,30 @@ func (s *Server) vmAction( } } +func (s *Server) kubeProxyAction( + req *restful.Request, + resp *restful.Response, + res *Response, + nodeName string, + fn func(nodeName string) error, + method string, +) { + if err := fn(nodeName); err != nil { + res.message(fmt.Sprintf("failed to invoke %s, nodeName: %s, error: %v", method, nodeName, err)). + statusCode(http.StatusInternalServerError) + if err = resp.WriteEntity(res); err != nil { + glog.Errorf("failed to response, methods: %s, error: %v", method, err) + } + return + } + + res.message("OK").statusCode(http.StatusOK) + + if err := resp.WriteEntity(res); err != nil { + glog.Errorf("failed to response, method: %s, error: %v", method, err) + } +} + func (s *Server) getVM(name string) (*manager.VM, error) { vms, err := s.mgr.ListVMs() if err != nil { diff --git a/tests/pkg/fault-trigger/client/client.go b/tests/pkg/fault-trigger/client/client.go index 48a1bc5248..da42529e9f 100644 --- a/tests/pkg/fault-trigger/client/client.go +++ b/tests/pkg/fault-trigger/client/client.go @@ -41,10 +41,10 @@ type Client interface { // StopKubeControllerManager stops the kube-controller-manager service StopKubeControllerManager() error // TODO: support controller kube-proxy - // // StartKubeProxy starts the kube-proxy service - // StartKubeProxy() error - // // StopKubeProxy stops the kube-proxy service - // StopKubeProxy() error + // StartKubeProxy starts the kube-proxy of the specific node + StartKubeProxy(nodeName string) error + // // StopKubeProxy stops the kube-proxy of the specific node + StopKubeProxy(nodeName string) error // // StartKubeScheduler starts the kube-scheduler service } @@ -219,13 +219,25 @@ func (c *client) StopKubeAPIServer() error { return c.stopService(manager.KubeAPIServerService) } -// func (c *client) StartKubeProxy() error { -// return c.startService(manager.KubeProxyService) -// } -// -// func (c *client) StopKubeProxy() error { -// return c.stopService(manager.KubeProxyService) -// } +func (c *client) StartKubeProxy(nodeName string) error { + url := util.GenURL(fmt.Sprintf("%s%s/kube-proxy/%s/start", c.cfg.Addr, api.APIPrefix, nodeName)) + if _, err := c.post(url, nil); err != nil { + glog.Errorf("failed to post %s: %v", url, err) + return err + } + + return nil +} + +func (c *client) StopKubeProxy(nodeName string) error { + url := util.GenURL(fmt.Sprintf("%s%s/kube-proxy/%s/stop", c.cfg.Addr, api.APIPrefix, nodeName)) + if _, err := c.post(url, nil); err != nil { + glog.Errorf("failed to post %s: %v", url, err) + return err + } + + return nil +} func (c *client) StartKubeScheduler() error { return c.startService(manager.KubeSchedulerService) @@ -246,7 +258,7 @@ func (c *client) StopKubeControllerManager() error { func (c *client) startService(serviceName string) error { url := util.GenURL(fmt.Sprintf("%s%s/%s/start", c.cfg.Addr, api.APIPrefix, serviceName)) if _, err := c.post(url, nil); err != nil { - glog.Errorf("faled to post %s: %v", url, err) + glog.Errorf("failed to post %s: %v", url, err) return err } @@ -256,7 +268,7 @@ func (c *client) startService(serviceName string) error { func (c *client) stopService(serviceName string) error { url := util.GenURL(fmt.Sprintf("%s%s/%s/stop", c.cfg.Addr, api.APIPrefix, serviceName)) if _, err := c.post(url, nil); err != nil { - glog.Errorf("faled to post %s: %v", url, err) + glog.Errorf("failed to post %s: %v", url, err) return err } diff --git a/tests/pkg/fault-trigger/manager/kube_proxy.go b/tests/pkg/fault-trigger/manager/kube_proxy.go new file mode 100644 index 0000000000..6d961b00fe --- /dev/null +++ b/tests/pkg/fault-trigger/manager/kube_proxy.go @@ -0,0 +1,156 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + + "github.com/golang/glog" + + "github.com/pingcap/tidb-operator/tests/pkg/util" +) + +const ( + kubeProxyPath = "/etc/kubernetes/kube-proxy" + kubeProxyManifest = "kube-proxy-ds.yml" + kubeProxyLabel = "app.kubernetes.io/kube-proxy" +) + +// UpdateKubeProxyDaemonset add node affinity for previous kube-proxy daemonset +func (m *Manager) UpdateKubeProxyDaemonset(hostname string) error { + kubectlPath, err := getKubectlPath() + if err != nil { + glog.Infof("skip update kube-proxy daemonset, err: %v", err) + return nil + } + + // Get all k8s master nodes + k8sMasterNodes, err := util.ListK8sNodes(kubectlPath, fmt.Sprintf("%s=", util.LabelNodeRoleMaster)) + if err != nil { + return err + } + + // Select the first k8s master node to execute the update operation + if hostname != k8sMasterNodes[0] { + glog.Infof("not the first k8s master node, skip update kube-proxy daemonset, node name: %s", hostname) + return nil + } + + // Ensure kubeProxyPath exists + err = util.EnsureDirectoryExist(kubeProxyPath) + if err != nil { + return err + } + + // Render kub-proxy daemonset's template + kubeProxyManifestFullPath := filepath.Join(kubeProxyPath, kubeProxyManifest) + f, err := os.Create(kubeProxyManifestFullPath) + if err != nil { + return fmt.Errorf("create file %s failed, err: %v", kubeProxyManifestFullPath, err) + } + defer f.Close() + proxyDaemonSetBytes, err := util.ParseTemplate(KubeProxyDaemonSet, struct{ Image, KubeProxyLabel string }{ + Image: m.kubeProxyImages, + KubeProxyLabel: kubeProxyLabel, + }) + if err != nil { + return fmt.Errorf("parsing kube-proxy daemonset template failed, err: %v", err) + } + _, err = f.Write(proxyDaemonSetBytes) + if err != nil { + return fmt.Errorf("write kube-proxy daemonset manifest into file %s failed, err: %v", kubeProxyManifestFullPath, err) + } + + // Apply kube-proxy daemonset manifest + commandStr := fmt.Sprintf("%s apply -f %s", kubectlPath, kubeProxyManifestFullPath) + cmd := exec.Command("/bin/sh", "-c", commandStr) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("exec: [%s] failed, output: %s, error: %v", commandStr, string(output), err) + } + + // Add app.kubernetes.io/kube-proxy= label for all k8s nodes + allK8sNodes, err := util.ListK8sNodes(kubectlPath, "") + if err != nil { + return err + } + for _, node := range allK8sNodes { + err := addKubeProxyLabel(kubectlPath, node) + if err != nil { + glog.Error(err) + return err + } + } + + glog.Infof("update kube-proxy daemonset success") + return nil +} + +// StartKubeProxy start kube-proxy of the specified node +func (m *Manager) StartKubeProxy(nodeName string) error { + kubectlPath, err := getKubectlPath() + if err != nil { + glog.Error(err) + return err + } + err = addKubeProxyLabel(kubectlPath, nodeName) + if err != nil { + glog.Error(err) + return err + } + + glog.Infof("node %s kube-proxy is started", nodeName) + return nil +} + +// StopKubeProxy stop kube-proxy of the specified node +func (m *Manager) StopKubeProxy(nodeName string) error { + kubectlPath, err := getKubectlPath() + if err != nil { + glog.Error(err) + return err + } + commandStr := fmt.Sprintf("%s label no %s %s-", kubectlPath, nodeName, kubeProxyLabel) + cmd := exec.Command("/bin/sh", "-c", commandStr) + output, err := cmd.CombinedOutput() + if err != nil { + glog.Errorf("remove kube-proxy label failed, command: [%s], output: %s, error: %v", commandStr, string(output), err) + return err + } + + glog.Infof("node %s kube-proxy is stoped", nodeName) + return nil +} + +func getKubectlPath() (string, error) { + paths := filepath.SplitList(os.Getenv("PATH")) + kubectlPath, err := util.FindInPath("kubectl", paths) + if err != nil { + return "", fmt.Errorf("can't found kubectl binary, not deployed on the k8s master node, err: %v", err) + } + return kubectlPath, nil +} + +func addKubeProxyLabel(kubectlPath, nodeName string) error { + commandStr := fmt.Sprintf("%s label no %s %s= --overwrite", kubectlPath, nodeName, kubeProxyLabel) + cmd := exec.Command("/bin/sh", "-c", commandStr) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("add kube-proxy label failed, command: [%s], output: %s, error: %v", commandStr, string(output), err) + } + return nil +} diff --git a/tests/pkg/fault-trigger/manager/manager.go b/tests/pkg/fault-trigger/manager/manager.go index 6fe00d2ce5..22b41056fa 100644 --- a/tests/pkg/fault-trigger/manager/manager.go +++ b/tests/pkg/fault-trigger/manager/manager.go @@ -21,13 +21,15 @@ import ( // Manager to manager fault trigger type Manager struct { sync.RWMutex - vmCache map[string]string + vmCache map[string]string + kubeProxyImages string } // NewManager returns a manager instance -func NewManager() *Manager { +func NewManager(kubeProxyImage string) *Manager { return &Manager{ - vmCache: make(map[string]string), + kubeProxyImages: kubeProxyImage, + vmCache: make(map[string]string), } } diff --git a/tests/pkg/fault-trigger/manager/manifests.go b/tests/pkg/fault-trigger/manager/manifests.go new file mode 100644 index 0000000000..a5413e67df --- /dev/null +++ b/tests/pkg/fault-trigger/manager/manifests.go @@ -0,0 +1,82 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +const ( + KubeProxyDaemonSet = ` +apiVersion: apps/v1 +kind: DaemonSet +metadata: + labels: + k8s-app: kube-proxy + name: kube-proxy + namespace: kube-system +spec: + selector: + matchLabels: + k8s-app: kube-proxy + updateStrategy: + type: RollingUpdate + template: + metadata: + labels: + k8s-app: kube-proxy + annotations: + scheduler.alpha.kubernetes.io/critical-pod: "" + spec: + priorityClassName: system-node-critical + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: {{ .KubeProxyLabel }} + operator: Exists + containers: + - name: kube-proxy + image: {{ .Image }} + imagePullPolicy: IfNotPresent + command: + - /usr/local/bin/kube-proxy + - --config=/var/lib/kube-proxy/config.conf + securityContext: + privileged: true + volumeMounts: + - mountPath: /var/lib/kube-proxy + name: kube-proxy + - mountPath: /run/xtables.lock + name: xtables-lock + readOnly: false + - mountPath: /lib/modules + name: lib-modules + readOnly: true + hostNetwork: true + serviceAccountName: kube-proxy + volumes: + - name: kube-proxy + configMap: + name: kube-proxy + - name: xtables-lock + hostPath: + path: /run/xtables.lock + type: FileOrCreate + - name: lib-modules + hostPath: + path: /lib/modules + tolerations: + - key: CriticalAddonsOnly + operator: Exists + - operator: Exists +` +) diff --git a/tests/pkg/fault-trigger/manager/static_pod_service.go b/tests/pkg/fault-trigger/manager/static_pod_service.go index b58e47cf89..8782f84852 100644 --- a/tests/pkg/fault-trigger/manager/static_pod_service.go +++ b/tests/pkg/fault-trigger/manager/static_pod_service.go @@ -53,16 +53,6 @@ func (m *Manager) StopKubeAPIServer() error { return m.stopStaticPodService(KubeAPIServerService, kubeAPIServerManifes) } -// // StartKubeProxy starts the kube-proxy service -// func (m *Manager) StartKubeProxy() error { -// return m.startStaticPodService(KubeProxyService, kubeProxyManifest) -// } -// -// // StopKubeProxy stops the kube-proxy service -// func (m *Manager) StopKubeProxy() error { -// return m.stopStaticPodService(KubeProxyService, kubeProxyManifest) -// } - // StartKubeControllerManager starts the kube-controller-manager service func (m *Manager) StartKubeControllerManager() error { return m.startStaticPodService(KubeControllerManagerService, kubeControllerManagerManifest) @@ -74,12 +64,12 @@ func (m *Manager) StopKubeControllerManager() error { } func (m *Manager) stopStaticPodService(serviceName string, fileName string) error { - maniest := fmt.Sprintf("%s/%s", staticPodPath, fileName) - if _, err := os.Stat(maniest); os.IsNotExist(err) { + manifest := fmt.Sprintf("%s/%s", staticPodPath, fileName) + if _, err := os.Stat(manifest); os.IsNotExist(err) { glog.Infof("%s had been stopped before", serviceName) return nil } - shell := fmt.Sprintf("mkdir -p %s && mv %s %s", staticPodTmpPath, maniest, staticPodTmpPath) + shell := fmt.Sprintf("mkdir -p %s && mv %s %s", staticPodTmpPath, manifest, staticPodTmpPath) cmd := exec.Command("/bin/sh", "-c", shell) output, err := cmd.CombinedOutput() @@ -94,12 +84,12 @@ func (m *Manager) stopStaticPodService(serviceName string, fileName string) erro } func (m *Manager) startStaticPodService(serviceName string, fileName string) error { - maniest := fmt.Sprintf("%s/%s", staticPodTmpPath, fileName) - if _, err := os.Stat(maniest); os.IsNotExist(err) { + manifest := fmt.Sprintf("%s/%s", staticPodTmpPath, fileName) + if _, err := os.Stat(manifest); os.IsNotExist(err) { glog.Infof("%s had been started before", serviceName) return nil } - shell := fmt.Sprintf("mv %s %s", maniest, staticPodPath) + shell := fmt.Sprintf("mv %s %s", manifest, staticPodPath) cmd := exec.Command("/bin/sh", "-c", shell) output, err := cmd.CombinedOutput() diff --git a/tests/pkg/util/find.go b/tests/pkg/util/find.go new file mode 100644 index 0000000000..762c84dc85 --- /dev/null +++ b/tests/pkg/util/find.go @@ -0,0 +1,32 @@ +package util + +import ( + "fmt" + "os" + "path/filepath" +) + +// Valid file extensions for plugin executables. +var ExecutableFileExtensions = []string{""} + +// FindInPath returns the full path of the binary by searching in the provided path +func FindInPath(binary string, paths []string) (string, error) { + if binary == "" { + return "", fmt.Errorf("no binary name provided") + } + + if len(paths) == 0 { + return "", fmt.Errorf("no paths provided") + } + + for _, path := range paths { + for _, fe := range ExecutableFileExtensions { + fullpath := filepath.Join(path, binary) + fe + if fi, err := os.Stat(fullpath); err == nil && fi.Mode().IsRegular() { + return fullpath, nil + } + } + } + + return "", fmt.Errorf("failed to find binary %s in path %s", binary, paths) +} diff --git a/tests/pkg/util/template.go b/tests/pkg/util/template.go new file mode 100644 index 0000000000..3b45da1264 --- /dev/null +++ b/tests/pkg/util/template.go @@ -0,0 +1,21 @@ +package util + +import ( + "bytes" + "fmt" + "html/template" +) + +// ParseTemplate validates and parses passed as argument template +func ParseTemplate(strtmpl string, obj interface{}) ([]byte, error) { + var buf bytes.Buffer + tmpl, err := template.New("template").Parse(strtmpl) + if err != nil { + return nil, fmt.Errorf("error when parsing template: %v", err) + } + err = tmpl.Execute(&buf, obj) + if err != nil { + return nil, fmt.Errorf("error when executing template: %v", err) + } + return buf.Bytes(), nil +} diff --git a/tests/pkg/util/utils.go b/tests/pkg/util/utils.go new file mode 100644 index 0000000000..a0e37a853b --- /dev/null +++ b/tests/pkg/util/utils.go @@ -0,0 +1,81 @@ +package util + +import ( + "fmt" + "os" + "os/exec" + "strings" + + "github.com/golang/glog" +) + +const ( + // LabelNodeRoleMaster specifies that a node is a master + LabelNodeRoleMaster = "node-role.kubernetes.io/master" +) + +// GetHostname returns OS's hostname if 'hostnameOverride' is empty; otherwise, return 'hostnameOverride'. +func GetHostname(hostnameOverride string) (string, error) { + hostName := hostnameOverride + if len(hostName) == 0 { + nodeName, err := os.Hostname() + if err != nil { + return "", fmt.Errorf("couldn't determine hostname: %v", err) + } + hostName = nodeName + } + + // Trim whitespaces first to avoid getting an empty hostname + // For linux, the hostname is read from file /proc/sys/kernel/hostname directly + hostName = strings.TrimSpace(hostName) + if len(hostName) == 0 { + return "", fmt.Errorf("empty hostname is invalid") + } + return strings.ToLower(hostName), nil +} + +// ListK8sNodes returns k8s nodes base on node labels +func ListK8sNodes(kubectlPath, labels string) ([]string, error) { + commandSlice := []string{ + kubectlPath, + "get", + "no", + "--no-headers", + "-ocustom-columns=:.metadata.name", + } + if labels != "" { + commandSlice = append(commandSlice, "-l", labels) + } + commandStr := strings.Join(commandSlice, " ") + cmd := exec.Command("/bin/sh", "-c", commandStr) + output, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("exec: [%s] failed, output: %s, error: %v", commandStr, string(output), err) + } + + nodes := strings.Split(strings.TrimSpace(string(output)), "\n") + if len(nodes) == 0 { + return nil, fmt.Errorf("get k8s nodes is empty") + } + glog.Infof("get k8s nodes success: %s, labels: %s", nodes, labels) + return nodes, nil +} + +// EnsureDirectoryExist create directory if does not exist +func EnsureDirectoryExist(dirName string) error { + src, err := os.Stat(dirName) + + if os.IsNotExist(err) { + errDir := os.MkdirAll(dirName, os.ModePerm) + if errDir != nil { + return fmt.Errorf("create dir %s failed. err: %v", dirName, err) + } + return nil + } + + if src.Mode().IsRegular() { + return fmt.Errorf("%s already exist as a file", dirName) + } + + return nil +}