本章的任务主线任务如下
- kubectl怎么启动
- kubectl apply -f deployement.yaml发送了什么
- kubectl logs -f deplyment/redis-leader发生了什么
- kubectl exec -it redis-leader-xxxx-yyyy发生了什么
- kubectl describe deployment/redis-leader发生了什么
2-5暂时不展开了
其实kubectl的每个子命令流程大体分为三步,
- 解析命令行参数
- 创建kubeconfig(具体的struct名字是ConfigFlags)对象, 基于这个对象创建对应的rest client(默认是一个本地缓存的client)
- 基于命令参数的操作用rest client与api server进行交互
首先以下面的命令作为起始任务,看看kubectl的代码结构
kubectl get deploy
kubernetes的所有组件入口函数几乎都是一致的,使用的命令行解析库是cobra
func main() {
rand.Seed(time.Now().UnixNano())
command := cmd.NewDefaultKubectlCommand()
// 命令行设置
// 设置正则函数,将_转成-
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 兼容用原生flag库设置的命令行参数
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// 初始化日志
logs.InitLogs()
defer logs.FlushLogs()
// 开始调用
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
为什么不直接用标准库的flag呢?因为标准库的flag库与linux下的POSIX命令行参数标准不兼容,比如linux下的命令行 -a -b可以连起来写成-ab, 而golang官方库的flag不支持。
GNU POSIX命令行参数标准参考: http://www.gnu.org/software/libc/manual/html_node/Argument-Syntax.html
// New一个对象 常规操作了
func NewDefaultKubectlCommand() *cobra.Command {
return NewDefaultKubectlCommandWithArgs(NewDefaultPluginHandler(plugin.ValidPluginFilenamePrefixes), os.Args, os.Stdin, os.Stdout, os.Stderr)
}
func NewDefaultKubectlCommandWithArgs(pluginHandler PluginHandler, args []string, in io.Reader, out, errout io.Writer) *cobra.Command {
cmd := NewKubectlCommand(in, out, errout)
// 默认会检测命令行是否有调用插件的pluginHandler
if pluginHandler == nil {
return cmd
}
// 检查参数中是不是调用插件
if len(args) > 1 {
// 将kubectl get deploy 截断成 get deploy以检测后续命令是否是插件的命令行
// 这里显然不是
cmdPathPieces := args[1:]
if _, _, err := cmd.Find(cmdPathPieces); err != nil {
// 如果是调用插件就执行然后退出
if err := HandlePluginCommand(pluginHandler, cmdPathPieces); err != nil {
fmt.Fprintf(errout, "Error: %v\n", err)
os.Exit(1)
}
}
}
return cmd
}
// 最终command在这里构造
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
warningHandler := rest.NewWarningWriter(err, rest.WarningWriterOptions{Deduplicate: true, Color: term.AllowsColorOutput(err)})
warningsAsErrors := false
// 最顶级的Command对象, 多有子命令都挂载在这个对象之下
cmds := &cobra.Command{
Use: "kubectl",
Short: i18n.T("kubectl controls the Kubernetes cluster manager"),
Long: templates.LongDesc(`
kubectl controls the Kubernetes cluster manager.
Find more information at:
https://kubernetes.io/docs/reference/kubectl/overview/`),
Run: runHelp,
// cobra的Persistent关键字代表持久化,可继承的意思,即子命令会继承
// 运行RunE之前
PersistentPreRunE: func(*cobra.Command, []string) error {
rest.SetDefaultWarningHandler(warningHandler)
return initProfiling()
},
// 运行RunE之前
PersistentPostRunE: func(*cobra.Command, []string) error {
if err := flushProfiling(); err != nil {
return err
}
if warningsAsErrors {
count := warningHandler.WarningCount()
switch count {
case 0:
// no warnings
case 1:
return fmt.Errorf("%d warning received", count)
default:
return fmt.Errorf("%d warnings received", count)
}
}
return nil
},
}
flags := cmds.PersistentFlags()
flags.SetNormalizeFunc(cliflag.WarnWordSepNormalizeFunc) // 对于下划线 _ 的参数警告
flags.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 增加性能调优的参数开关
// 统计CPU,内存等相关信息,用于性能优化
addProfilingFlags(flags)
flags.BoolVar(&warningsAsErrors, "warnings-as-errors", warningsAsErrors, "Treat warnings received from the server as errors and exit with a non-zero exit code")
// ConfigFlags对象, 后续创建rest client的生成基本来自这
// 创建空对象
kubeConfigFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag()
// 设置命令参数,将参数解析值绑定到kubeConfigFlags
kubeConfigFlags.AddFlags(flags)
// 包装一层,其实差不多就是加一个是否匹配client与server版本的参数match-server-version
matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())
// Updates hooks to add kubectl command headers: SIG CLI KEP 859.
// 为rest client 增加HTTP Header
// 类似: Kubectl-Command: kubectl apply
addCmdHeaderHooks(cmds, kubeConfigFlags)
// 为持久化命令行参数对象也增加(兼容)原生flag命令行参数
cmds.PersistentFlags().AddGoFlagSet(flag.CommandLine)
// 最终kubeconfig对象被包装成一个工厂类型
// 该对象实现了DynamicClient(), KubernetesClientSet(), NewBuilder()等接口
// 这一层层的包装是委托者设计模式,见下文
f := cmdutil.NewFactory(matchVersionKubeConfigFlags)
// 将标准输入,输出,错误包装一下
ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}
// 下面就是kubectl的所有子命令,其中proxyCmd不兼容
// 跟CommandHeaderRoundTripper不兼容, 所以单独创建
// clear the WrapConfigFn before running proxy command.
proxyCmd := proxy.NewCmdProxy(f, ioStreams)
proxyCmd.PreRun = func(cmd *cobra.Command, args []string) {
kubeConfigFlags.WrapConfigFn = nil
}
groups := templates.CommandGroups{
{
Message: "Basic Commands (Beginner):",
Commands: []*cobra.Command{
create.NewCmdCreate(f, ioStreams),
expose.NewCmdExposeService(f, ioStreams),
run.NewCmdRun(f, ioStreams),
set.NewCmdSet(f, ioStreams),
},
},
{
Message: "Basic Commands (Intermediate):",
Commands: []*cobra.Command{
explain.NewCmdExplain("kubectl", f, ioStreams),
// 本次命令kubectl get deploy会调用的子命令
get.NewCmdGet("kubectl", f, ioStreams),
edit.NewCmdEdit(f, ioStreams),
delete.NewCmdDelete(f, ioStreams),
},
},
{
Message: "Deploy Commands:",
Commands: []*cobra.Command{
rollout.NewCmdRollout(f, ioStreams),
scale.NewCmdScale(f, ioStreams),
autoscale.NewCmdAutoscale(f, ioStreams),
},
},
{
Message: "Cluster Management Commands:",
Commands: []*cobra.Command{
certificates.NewCmdCertificate(f, ioStreams),
clusterinfo.NewCmdClusterInfo(f, ioStreams),
top.NewCmdTop(f, ioStreams),
drain.NewCmdCordon(f, ioStreams),
drain.NewCmdUncordon(f, ioStreams),
drain.NewCmdDrain(f, ioStreams),
taint.NewCmdTaint(f, ioStreams),
},
},
{
Message: "Troubleshooting and Debugging Commands:",
Commands: []*cobra.Command{
describe.NewCmdDescribe("kubectl", f, ioStreams),
logs.NewCmdLogs(f, ioStreams),
attach.NewCmdAttach(f, ioStreams),
cmdexec.NewCmdExec(f, ioStreams),
portforward.NewCmdPortForward(f, ioStreams),
proxyCmd,
cp.NewCmdCp(f, ioStreams),
auth.NewCmdAuth(f, ioStreams),
debug.NewCmdDebug(f, ioStreams),
},
},
{
Message: "Advanced Commands:",
Commands: []*cobra.Command{
diff.NewCmdDiff(f, ioStreams),
apply.NewCmdApply("kubectl", f, ioStreams),
patch.NewCmdPatch(f, ioStreams),
replace.NewCmdReplace(f, ioStreams),
wait.NewCmdWait(f, ioStreams),
kustomize.NewCmdKustomize(ioStreams),
},
},
{
Message: "Settings Commands:",
Commands: []*cobra.Command{
label.NewCmdLabel(f, ioStreams),
annotate.NewCmdAnnotate("kubectl", f, ioStreams),
completion.NewCmdCompletion(ioStreams.Out, ""),
},
},
}
// 将上面对象作为子命令添加到cobra.Command对象中
groups.Add(cmds)
filters := []string{"options"}
// 然后是其他子命令
alpha := NewCmdAlpha(ioStreams)
if !alpha.HasSubCommands() {
filters = append(filters, alpha.Name())
}
templates.ActsAsRootCommand(cmds, filters, groups...)
util.SetFactoryForCompletion(f)
registerCompletionFuncForGlobalFlags(cmds, f)
cmds.AddCommand(alpha)
cmds.AddCommand(cmdconfig.NewCmdConfig(clientcmd.NewDefaultPathOptions(), ioStreams))
cmds.AddCommand(plugin.NewCmdPlugin(ioStreams))
cmds.AddCommand(version.NewCmdVersion(f, ioStreams))
cmds.AddCommand(apiresources.NewCmdAPIVersions(f, ioStreams))
cmds.AddCommand(apiresources.NewCmdAPIResources(f, ioStreams))
cmds.AddCommand(options.NewCmdOptions(ioStreams.Out))
return cmds
}
在深入get.NewCmdGet("kubectl", f, ioStreams)之前,首先看看委托者模式
package main
import "fmt"
type Client interface {
Get(string)
}
type A struct {
}
func (a *A) Get(val string) {
fmt.Printf("A Get %s\n", val)
}
type B struct {
Delegate Client
}
func (b *B) Get(val string) {
fmt.Println("from B to Delegate")
b.Delegate.Get(val)
}
type C struct {
Delegate Client
}
func (c *C) Get(val string) {
fmt.Println("from C to Delegate")
c.Delegate.Get(val)
}
func main() {
a := A{}
b := B{&a}
c := C{&b}
c.Get("hello world")
}
输出如下:
from C to Delegate
from B to Delegate
A Get hello world
总的来书,具体的实现由Delegate实现,上层就是一层层的往下委托, 而kubectl中的f := cmdutil.NewFactory(matchVersionKubeConfigFlags)就是如此。
以ToRESTConfig方法为例,委托调用链如下
factoryImpl.ToRESTConfig -> f.clientGetter.ToRESTConfig() -> MatchVersionFlags.ToRESTConfig -> f.Delegate.ToRESTConfig() -> ConfigFlags.ToRESTConfig
委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin
// kubectl get deploy对应的子命令
// 为了减少篇幅,就将模板代码及命令行参数去掉了
func NewCmdGet(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
o := NewGetOptions(parent, streams)
cmd := &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
// 填充必要的默认值, 以便后续能够正常调用
cmdutil.CheckErr(o.Complete(f, cmd, args))
// 验证参数是否正确
cmdutil.CheckErr(o.Validate(cmd))
// 最终的运行逻辑
cmdutil.CheckErr(o.Run(f, cmd, args))
},
}
return cmd
}
每个cobra.Command的创建其实差不多
func (o *GetOptions) Run(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
// 省略了raw,watch参数对应的操作
// 构造者模式
// 以构造者模式创建一个实现了访问者模式的Result对象
r := f.NewBuilder().
// 以map的方式传输数据对象,对响应内容中的数据做一层封装,这样就可以保留所有字段而不需要首先解析成一个struct
Unstructured().
// 基于命令行参数设置查询的namespace
NamespaceParam(o.Namespace).DefaultNamespace().AllNamespaces(o.AllNamespaces).
// 解析文件名参数 参数 -f
// 这里没有指定 所以b.paths为空
FilenameParam(o.ExplicitNamespace, &o.FilenameOptions).
// 解析标签选择器 参数 -l
LabelSelectorParam(o.LabelSelector).
// 解析字段参数
FieldSelectorParam(o.FieldSelector).
// 是否分段请求, 避免响应过大
RequestChunksOf(chunkSize).
// kubectl get deploy 指定了资源为deployment
ResourceTypeOrNameArgs(true, args...).
// 配置result对象在出现错误的行为, 意思很明显,在出错之后继续
ContinueOnError().
// 获取最新对象
Latest().
// 将对象展开
// 比如对象是[a, b], 如果没有flatten就是完成访问[a,b]作为一个整体,反之, 让外层函数分别访问a,b
Flatten().
// 配置一个在请求中修改request对象函数
TransformRequests(o.transformRequests).
// 基于之前的配置,生成最终的result对象
Do()
allErrs := []error{}
errs := sets.NewString()
// 获取数据
infos, err := r.Infos()
// 关于怎么输出,就省略了
// 将所有错误聚合在一起返回
return utilerrors.NewAggregate(allErrs)
}
在深入请求的逻辑之前先看看代码中用到的设计模式,其实还有一个装饰器模式,但其实就是在原函数的基础上包装一层,就不说了。
package main
import "fmt"
type Car struct {
name string
color string
speed string
}
func (c *Car) Name(name string) {
c.name = name
}
func (c *Car) Color(color string) {
c.color = color
}
func (c *Car) Speed(speed int) {
c.speed = fmt.Sprintf("%d km/h", speed)
}
func (c *Car) Run() {
fmt.Printf("一辆叫做%s的%s汽车以%s速度在狂奔", c.name, c.color, c.speed)
}
func main() {
c1 := &Car{}
c1.Name("c1")
c1.Color("red")
c1.Speed(100)
c1.Run()
}
访问者模式的好处在于将熟悉的配置抽象成方法,可以隐藏一些复杂的细节,以及控制内部属性
package main
import (
"encoding/json"
"fmt"
)
type VisitFunc func(interface{}) ([]byte, error)
type Visitor interface {
Visit(VisitFunc) ([]byte, error)
}
type Data struct {
Name string `json:"name" xml:"name"`
Age int `json:"age" xml:"age"`
}
func (d *Data) Visit(fn VisitFunc) ([]byte, error) {
fmt.Println("最内层调用")
return fn(d)
}
type JsonVisitor struct {
visitor Visitor
}
func (j *JsonVisitor) Visit(fn VisitFunc) ([]byte, error) {
return j.visitor.Visit(func(data interface{}) ([]byte, error) {
fmt.Println("由JsonVisitor访问")
buf, _ := json.MarshalIndent(data, "", " ")
fmt.Println(string(buf))
return fn(data)
})
}
type XMLVisitor struct {
visitor Visitor
}
func (x *XMLVisitor) Visit(fn VisitFunc) ([]byte, error) {
return x.visitor.Visit(func(data interface{}) ([]byte, error) {
fmt.Println("由XMLVisitor访问")
buf, _ := json.MarshalIndent(data, "", " ")
fmt.Println(string(buf))
return fn(data)
})
}
func main() {
d1 := Data{"d1", 11}
jv := JsonVisitor{&d1}
xv := XMLVisitor{&jv}
buf, _ := xv.Visit(func(interface{}) ([]byte, error) {
return []byte("最外层调用"), nil
})
fmt.Println(string(buf))
}
输出如下
最内层调用
由JsonVisitor访问
{
"name": "d1",
"age": 11
}
由XMLVisitor访问
<Data>
<name>d1</name>
<age>11</age>
</Data>
最外层调用
访问者模式的好处在于定义统一的接口让外部与内部交流,解耦数据与数据处理逻辑。
设计模式一般在大型项目用的多,是否需要看自己需要
从上文知道,result对象最终在Do方法中创建,所以看下它对应的实现。
// 返回一个实现了Visitor接口的Result对象,这个对象对应的资源用构建者构造的时候配置
func (b *Builder) Do() *Result {
// 基于参数创建result对象, 创建对应的rest client等资源
r := b.visitorResult()
// mapper是一个用于在k8s资源与响应内容之间的一个映射,差不多可以看做一个解码器,这里的mapper, 在构造者的Unstructured方法中指定
r.mapper = b.Mapper()
// 后面的判断都是在result对象的基础上加上一层又一层的包装
if b.flatten {
// 将数据列表扁平化,
r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
}
helpers := []VisitorFunc{}
if b.defaultNamespace {
// 设置namespace属性
helpers = append(helpers, SetNamespace(b.namespace))
}
if b.requireNamespace {
// 如果数据对象没有namespace就设置一个,有就检验是否与需要的一致
helpers = append(helpers, RequireNamespace(b.namespace))
}
helpers = append(helpers, FilterNamespace)
if b.requireObject {
// 检索资源, 在这里触发http 请求
helpers = append(helpers, RetrieveLazy)
}
if b.continueOnError {
// 遇到错误时继续
r.visitor = NewDecoratedVisitor(ContinueOnErrorVisitor{r.visitor}, helpers...)
} else {
r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
}
return r
}
那么从外层到内层的调用链如下:
DecoratedVisitor -> ContinueOnErrorVisitor -> RetrieveLazy -> RequireNamespace -> FlattenListVisitor -> Result
构造对应的result对象
func (b *Builder) visitorResult() *Result {
// 如果指定了 -f 参数, 那么最终的result对象在这里创建
if len(b.paths) != 0 {
return b.visitByPaths()
}
// 基于选择器
if b.labelSelector != nil || b.fieldSelector != nil {
return b.visitBySelector()
}
// 如果参数是 resource_name/name 比如deployment/nginx
if len(b.resourceTuples) != 0 {
return b.visitByResource()
}
// 本文中的数据流在这 因为参数是kubectl get deploy, deploy就是那个name
if len(b.names) != 0 {
return b.visitByName()
}
}
这个result对象的构造过程比较复杂,我也没完全看懂,等我看懂了在来写吧。
那么继续看看数据在哪里出发吧。
// result的Infos方法可以多次调用,所以这里会判断对象是否已经有获取的结果
func RetrieveLazy(info *Info, err error) error {
if err != nil {
return err
}
if info.Object == nil {
return info.Get()
}
return nil
}
// 基于构造的client及mapping获取对象解析对象
func (i *Info) Get() (err error) {
obj, err := NewHelper(i.Client, i.Mapping).Get(i.Namespace, i.Name)
if err != nil {
if errors.IsNotFound(err) && len(i.Namespace) > 0 && i.Namespace != metav1.NamespaceDefault && i.Namespace != metav1.NamespaceAll {
err2 := i.Client.Get().AbsPath("api", "v1", "namespaces", i.Namespace).Do(context.TODO()).Error()
if err2 != nil && errors.IsNotFound(err2) {
return err2
}
}
return err
}
i.Object = obj
i.ResourceVersion, _ = metadataAccessor.ResourceVersion(obj)
return nil
}
未完待续。。。。