Skip to content

Commit

Permalink
add fault inject for kube proxy (#384)
Browse files Browse the repository at this point in the history
* fix typo

* support for start and stop kube-proxy for each k8s node

* add KubeProxyLabel template variable

* add StartKubeProxy and StopKubeProxy method into Client interface
  • Loading branch information
onlymellb authored and xiaojingchen committed Apr 19, 2019
1 parent 6a5d583 commit 7040fa2
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 45 deletions.
20 changes: 17 additions & 3 deletions tests/cmd/fault-trigger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
_ "net/http/pprof"
"time"

"github.com/pingcap/tidb-operator/tests/pkg/util"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/api"
"github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/manager"
Expand All @@ -28,13 +30,17 @@ import (
)

var (
port int
pprofPort int
port int
pprofPort int
kubeProxyImage string
hostnameOverride string
)

func init() {
flag.IntVar(&port, "port", 23332, "The port that the fault trigger's http service runs on (default 23332)")
flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof's http service runs on (default 6060)")
flag.StringVar(&kubeProxyImage, "kube-proxy-image", "k8s.gcr.io/kube-proxy:v1.12.2", "The kube proxy image (default k8s.gcr.io/kube-proxy:v1.12.2)")
flag.StringVar(&hostnameOverride, "hostname-override", "", "If non-empty, will use this string as identification instead of the actual hostname")

flag.Parse()
}
Expand All @@ -43,7 +49,15 @@ func main() {
logs.InitLogs()
defer logs.FlushLogs()

mgr := manager.NewManager()
mgr := manager.NewManager(kubeProxyImage)
hostname, err := util.GetHostname(hostnameOverride)
if err != nil {
glog.Fatalf("get hostname failed, err: %v", err)
}
err = mgr.UpdateKubeProxyDaemonset(hostname)
if err != nil {
glog.Fatalf("update kube-proxy daemonset failed, err: %v", err)
}

server := api.NewServer(mgr, port)

Expand Down
6 changes: 3 additions & 3 deletions tests/pkg/fault-trigger/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (s *Server) newService() *restful.WebService {

ws.Route(ws.POST(fmt.Sprintf("/%s/start", manager.KubeControllerManagerService)).To(s.startKubeControllerManager))
ws.Route(ws.POST(fmt.Sprintf("/%s/stop", manager.KubeControllerManagerService)).To(s.stopKubeControllerManager))
// TODO: support kube-proxy
// ws.Route(ws.POST(fmt.Sprintf("/%s/start", manager.KubeProxyService)).To(s.startKubeProxy))
// ws.Route(ws.POST(fmt.Sprintf("/%s/stop", manager.KubeProxyService)).To(s.stopKubeProxy))

ws.Route(ws.POST("/kube-proxy/{nodeName}/start").To(s.startKubeProxy))
ws.Route(ws.POST("/kube-proxy/{nodeName}/stop").To(s.stopKubeProxy))

return ws
}
58 changes: 51 additions & 7 deletions tests/pkg/fault-trigger/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,33 @@ func (s *Server) stopKubeAPIServer(req *restful.Request, resp *restful.Response)
s.action(req, resp, s.mgr.StopKubeAPIServer, "stopKubeAPIServer")
}

// func (s *Server) startKubeProxy(req *restful.Request, resp *restful.Response) {
// s.action(req, resp, s.mgr.StartKubeProxy, "startKubeProxy")
// }
//
// func (s *Server) stopKubeProxy(req *restful.Request, resp *restful.Response) {
// s.action(req, resp, s.mgr.StopKubeProxy, "stopKubeProxy")
// }
func (s *Server) startKubeProxy(req *restful.Request, resp *restful.Response) {
res := newResponse("startKubeProxy")
nodeName := req.PathParameter("nodeName")
if len(nodeName) == 0 {
res.message(fmt.Sprintf("nodeName can't be empty")).statusCode(http.StatusBadRequest)
if err := resp.WriteEntity(res); err != nil {
glog.Errorf("failed to response, methods: startKubeProxy, error: %v", err)
}
return
}

s.kubeProxyAction(req, resp, res, nodeName, s.mgr.StartKubeProxy, "startKubeProxy")
}

func (s *Server) stopKubeProxy(req *restful.Request, resp *restful.Response) {
res := newResponse("stopKubeProxy")
nodeName := req.PathParameter("nodeName")
if len(nodeName) == 0 {
res.message(fmt.Sprintf("nodeName can't be empty")).statusCode(http.StatusBadRequest)
if err := resp.WriteEntity(res); err != nil {
glog.Errorf("failed to response, methods: stopKubeProxy, error: %v", err)
}
return
}

s.kubeProxyAction(req, resp, res, nodeName, s.mgr.StopKubeProxy, "stopKubeProxy")
}

func (s *Server) startKubeScheduler(req *restful.Request, resp *restful.Response) {
s.action(req, resp, s.mgr.StartKubeScheduler, "startKubeScheduler")
Expand Down Expand Up @@ -209,6 +229,30 @@ func (s *Server) vmAction(
}
}

func (s *Server) kubeProxyAction(
req *restful.Request,
resp *restful.Response,
res *Response,
nodeName string,
fn func(nodeName string) error,
method string,
) {
if err := fn(nodeName); err != nil {
res.message(fmt.Sprintf("failed to invoke %s, nodeName: %s, error: %v", method, nodeName, err)).
statusCode(http.StatusInternalServerError)
if err = resp.WriteEntity(res); err != nil {
glog.Errorf("failed to response, methods: %s, error: %v", method, err)
}
return
}

res.message("OK").statusCode(http.StatusOK)

if err := resp.WriteEntity(res); err != nil {
glog.Errorf("failed to response, method: %s, error: %v", method, err)
}
}

func (s *Server) getVM(name string) (*manager.VM, error) {
vms, err := s.mgr.ListVMs()
if err != nil {
Expand Down
38 changes: 25 additions & 13 deletions tests/pkg/fault-trigger/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ type Client interface {
// StopKubeControllerManager stops the kube-controller-manager service
StopKubeControllerManager() error
// TODO: support controller kube-proxy
// // StartKubeProxy starts the kube-proxy service
// StartKubeProxy() error
// // StopKubeProxy stops the kube-proxy service
// StopKubeProxy() error
// StartKubeProxy starts the kube-proxy of the specific node
StartKubeProxy(nodeName string) error
// // StopKubeProxy stops the kube-proxy of the specific node
StopKubeProxy(nodeName string) error
// // StartKubeScheduler starts the kube-scheduler service
}

Expand Down Expand Up @@ -219,13 +219,25 @@ func (c *client) StopKubeAPIServer() error {
return c.stopService(manager.KubeAPIServerService)
}

// func (c *client) StartKubeProxy() error {
// return c.startService(manager.KubeProxyService)
// }
//
// func (c *client) StopKubeProxy() error {
// return c.stopService(manager.KubeProxyService)
// }
func (c *client) StartKubeProxy(nodeName string) error {
url := util.GenURL(fmt.Sprintf("%s%s/kube-proxy/%s/start", c.cfg.Addr, api.APIPrefix, nodeName))
if _, err := c.post(url, nil); err != nil {
glog.Errorf("failed to post %s: %v", url, err)
return err
}

return nil
}

func (c *client) StopKubeProxy(nodeName string) error {
url := util.GenURL(fmt.Sprintf("%s%s/kube-proxy/%s/stop", c.cfg.Addr, api.APIPrefix, nodeName))
if _, err := c.post(url, nil); err != nil {
glog.Errorf("failed to post %s: %v", url, err)
return err
}

return nil
}

func (c *client) StartKubeScheduler() error {
return c.startService(manager.KubeSchedulerService)
Expand All @@ -246,7 +258,7 @@ func (c *client) StopKubeControllerManager() error {
func (c *client) startService(serviceName string) error {
url := util.GenURL(fmt.Sprintf("%s%s/%s/start", c.cfg.Addr, api.APIPrefix, serviceName))
if _, err := c.post(url, nil); err != nil {
glog.Errorf("faled to post %s: %v", url, err)
glog.Errorf("failed to post %s: %v", url, err)
return err
}

Expand All @@ -256,7 +268,7 @@ func (c *client) startService(serviceName string) error {
func (c *client) stopService(serviceName string) error {
url := util.GenURL(fmt.Sprintf("%s%s/%s/stop", c.cfg.Addr, api.APIPrefix, serviceName))
if _, err := c.post(url, nil); err != nil {
glog.Errorf("faled to post %s: %v", url, err)
glog.Errorf("failed to post %s: %v", url, err)
return err
}

Expand Down
156 changes: 156 additions & 0 deletions tests/pkg/fault-trigger/manager/kube_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package manager

import (
"fmt"
"os"
"os/exec"
"path/filepath"

"github.com/golang/glog"

"github.com/pingcap/tidb-operator/tests/pkg/util"
)

const (
kubeProxyPath = "/etc/kubernetes/kube-proxy"
kubeProxyManifest = "kube-proxy-ds.yml"
kubeProxyLabel = "app.kubernetes.io/kube-proxy"
)

// UpdateKubeProxyDaemonset add node affinity for previous kube-proxy daemonset
func (m *Manager) UpdateKubeProxyDaemonset(hostname string) error {
kubectlPath, err := getKubectlPath()
if err != nil {
glog.Infof("skip update kube-proxy daemonset, err: %v", err)
return nil
}

// Get all k8s master nodes
k8sMasterNodes, err := util.ListK8sNodes(kubectlPath, fmt.Sprintf("%s=", util.LabelNodeRoleMaster))
if err != nil {
return err
}

// Select the first k8s master node to execute the update operation
if hostname != k8sMasterNodes[0] {
glog.Infof("not the first k8s master node, skip update kube-proxy daemonset, node name: %s", hostname)
return nil
}

// Ensure kubeProxyPath exists
err = util.EnsureDirectoryExist(kubeProxyPath)
if err != nil {
return err
}

// Render kub-proxy daemonset's template
kubeProxyManifestFullPath := filepath.Join(kubeProxyPath, kubeProxyManifest)
f, err := os.Create(kubeProxyManifestFullPath)
if err != nil {
return fmt.Errorf("create file %s failed, err: %v", kubeProxyManifestFullPath, err)
}
defer f.Close()
proxyDaemonSetBytes, err := util.ParseTemplate(KubeProxyDaemonSet, struct{ Image, KubeProxyLabel string }{
Image: m.kubeProxyImages,
KubeProxyLabel: kubeProxyLabel,
})
if err != nil {
return fmt.Errorf("parsing kube-proxy daemonset template failed, err: %v", err)
}
_, err = f.Write(proxyDaemonSetBytes)
if err != nil {
return fmt.Errorf("write kube-proxy daemonset manifest into file %s failed, err: %v", kubeProxyManifestFullPath, err)
}

// Apply kube-proxy daemonset manifest
commandStr := fmt.Sprintf("%s apply -f %s", kubectlPath, kubeProxyManifestFullPath)
cmd := exec.Command("/bin/sh", "-c", commandStr)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("exec: [%s] failed, output: %s, error: %v", commandStr, string(output), err)
}

// Add app.kubernetes.io/kube-proxy= label for all k8s nodes
allK8sNodes, err := util.ListK8sNodes(kubectlPath, "")
if err != nil {
return err
}
for _, node := range allK8sNodes {
err := addKubeProxyLabel(kubectlPath, node)
if err != nil {
glog.Error(err)
return err
}
}

glog.Infof("update kube-proxy daemonset success")
return nil
}

// StartKubeProxy start kube-proxy of the specified node
func (m *Manager) StartKubeProxy(nodeName string) error {
kubectlPath, err := getKubectlPath()
if err != nil {
glog.Error(err)
return err
}
err = addKubeProxyLabel(kubectlPath, nodeName)
if err != nil {
glog.Error(err)
return err
}

glog.Infof("node %s kube-proxy is started", nodeName)
return nil
}

// StopKubeProxy stop kube-proxy of the specified node
func (m *Manager) StopKubeProxy(nodeName string) error {
kubectlPath, err := getKubectlPath()
if err != nil {
glog.Error(err)
return err
}
commandStr := fmt.Sprintf("%s label no %s %s-", kubectlPath, nodeName, kubeProxyLabel)
cmd := exec.Command("/bin/sh", "-c", commandStr)
output, err := cmd.CombinedOutput()
if err != nil {
glog.Errorf("remove kube-proxy label failed, command: [%s], output: %s, error: %v", commandStr, string(output), err)
return err
}

glog.Infof("node %s kube-proxy is stoped", nodeName)
return nil
}

func getKubectlPath() (string, error) {
paths := filepath.SplitList(os.Getenv("PATH"))
kubectlPath, err := util.FindInPath("kubectl", paths)
if err != nil {
return "", fmt.Errorf("can't found kubectl binary, not deployed on the k8s master node, err: %v", err)
}
return kubectlPath, nil
}

func addKubeProxyLabel(kubectlPath, nodeName string) error {
commandStr := fmt.Sprintf("%s label no %s %s= --overwrite", kubectlPath, nodeName, kubeProxyLabel)
cmd := exec.Command("/bin/sh", "-c", commandStr)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("add kube-proxy label failed, command: [%s], output: %s, error: %v", commandStr, string(output), err)
}
return nil
}
8 changes: 5 additions & 3 deletions tests/pkg/fault-trigger/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
// Manager to manager fault trigger
type Manager struct {
sync.RWMutex
vmCache map[string]string
vmCache map[string]string
kubeProxyImages string
}

// NewManager returns a manager instance
func NewManager() *Manager {
func NewManager(kubeProxyImage string) *Manager {
return &Manager{
vmCache: make(map[string]string),
kubeProxyImages: kubeProxyImage,
vmCache: make(map[string]string),
}
}

Expand Down
Loading

0 comments on commit 7040fa2

Please sign in to comment.