Skip to content

Commit

Permalink
Merge pull request #47 from chaitin/feat(kubernetes)/watch
Browse files Browse the repository at this point in the history
feat(kubernetes): watch
  • Loading branch information
zhoubinxuan authored Nov 14, 2022
2 parents e39edc3 + c6cb8e9 commit ed771fa
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 39 deletions.
21 changes: 17 additions & 4 deletions go/cmd/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"encoding/base64"
"strings"

"github.com/spf13/pflag"
Expand All @@ -25,7 +26,9 @@ func (r kubernetesRoot) Mode() string {
func (r kubernetesRoot) Options() plugin.ExecOption {
return plugin.WithExecOptions(
plugin.WithPrependArgs(
"--kube-config", r.k.ConfigPath()),
"--kube-config-path", r.k.ConfigPath()),
plugin.WithPrependArgs("--kube-config-bytes",
base64.StdEncoding.EncodeToString(r.k.ConfigBytes())),
plugin.WithPrependArgs(
"--namespace", r.k.CurrentNamespace()),
plugin.WithPrependArgs(
Expand All @@ -50,10 +53,20 @@ func (kubernetesMode) Name() string {
func (kubernetesMode) AddFlags(fset *pflag.FlagSet) {
pflagext.StringVarF(fset, func(path string) error {
kubernetesFlags = append(kubernetesFlags,
kubernetes.WithKubeConfig(path))
kubernetes.WithKubeConfigPath(path))
return nil
}, "kube-config",
`flag "--kube-config" specified kube config`)
}, "kube-config-path",
`flag "--kube-config-path" specified kube config`)
pflagext.StringVarF(fset, func(config string) error {
b, err := base64.StdEncoding.DecodeString(config)
if err != nil {
return err
}
kubernetesFlags = append(kubernetesFlags,
kubernetes.WithKubeConfigBytes(b))
return nil
}, "kube-config-bytes",
`flag "--kube-config-bytes" specified kube config bytes`)
pflagext.StringVarF(fset, func(namespace string) error {
kubernetesFlags = append(kubernetesFlags,
kubernetes.WithNamespace(namespace))
Expand Down
89 changes: 54 additions & 35 deletions go/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"

api "github.com/chaitin/libveinmind/go"
)
Expand All @@ -21,8 +22,11 @@ type Kubernetes struct {
// kubernetes cluster namespace
namespace string

// kube kubeConfig path for cluster
kubeConfig string
// kubeConfigPath path for cluster
kubeConfigPath string

// kubeConfigBytes for cluster
kubeConfigBytes []byte

// dynamicClient reference dynamic.Interface
// return data use map[string]interface{} format
Expand All @@ -45,9 +49,16 @@ func WithNamespace(namespace string) NewOption {
}
}

func WithKubeConfig(path string) NewOption {
func WithKubeConfigPath(path string) NewOption {
return func(kubernetes *Kubernetes) error {
kubernetes.kubeConfigPath = path
return nil
}
}

func WithKubeConfigBytes(config []byte) NewOption {
return func(kubernetes *Kubernetes) error {
kubernetes.kubeConfig = path
kubernetes.kubeConfigBytes = config
return nil
}
}
Expand All @@ -74,53 +85,57 @@ func New(options ...NewOption) (*Kubernetes, error) {
err error
)

// init namespace
if k.namespace == "" {
k.namespace = "default"
}

// init rest config
if k.inCluster {
restConfig, err = rest.InClusterConfig()
if err != nil {
return nil, err
}

clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}

grs, err := restmapper.GetAPIGroupResources(clientset.Discovery())
if err != nil {
return nil, err
}

k.restMapper = restmapper.NewDiscoveryRESTMapper(grs)
} else {
if k.kubeConfig == "" {
if k.kubeConfigPath == "" {
if os.Getenv("KUBECONFIG") == "" {
return nil, errors.New("kubernetes: can't find kube config path")
} else {
k.kubeConfig = os.Getenv("KUBECONFIG")
k.kubeConfigPath = os.Getenv("KUBECONFIG")
}
}

if k.namespace == "" {
k.namespace = "default"
if k.kubeConfigBytes != nil && len(k.kubeConfigBytes) > 0 {
restConfig, err = clientcmd.RESTConfigFromKubeConfig(k.kubeConfigBytes)
if err != nil {
return nil, errors.Wrap(err, "kubernetes: can't get rest config")
}
} else if k.kubeConfigPath != "" {
config := genericclioptions.NewConfigFlags(true)
*config.KubeConfig = k.kubeConfigPath
configLoader := config.ToRawKubeConfigLoader()
restConfig, err = configLoader.ClientConfig()
if err != nil {
return nil, errors.Wrap(err, "kubernetes: can't get rest config")
}
} else {
return nil, errors.New("kubernetes: can'f find kube config path or bytes")
}
}

// init dynamic client config
config := genericclioptions.NewConfigFlags(true)
*config.KubeConfig = k.kubeConfig
configLoader := config.ToRawKubeConfigLoader()
restConfig, err = configLoader.ClientConfig()
if err != nil {
return nil, errors.Wrap(err, "kubernetes: can't get rest config")
}
// init rest mapper
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}

// init rest mapper
mapper, err := config.ToRESTMapper()
if err != nil {
return nil, errors.Wrap(err, "kubernetes: can't init rest mapper")
}
k.restMapper = mapper
grs, err := restmapper.GetAPIGroupResources(clientset.Discovery())
if err != nil {
return nil, err
}

k.restMapper = restmapper.NewDiscoveryRESTMapper(grs)

// init dynamic client
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
Expand All @@ -145,7 +160,11 @@ func (k *Kubernetes) CurrentNamespace() string {
}

func (k *Kubernetes) ConfigPath() string {
return k.kubeConfig
return k.kubeConfigPath
}

func (k *Kubernetes) ConfigBytes() []byte {
return k.kubeConfigBytes
}

func (k *Kubernetes) InCluster() bool {
Expand Down
5 changes: 5 additions & 0 deletions go/kubernetes/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
)

Expand Down Expand Up @@ -70,6 +71,10 @@ func (r Resource) Update(ctx context.Context, resource []byte) error {
return err
}

func (r Resource) Watch(ctx context.Context) (watch.Interface, error) {
return r.resourceInterface.Watch(ctx, v1.ListOptions{})
}

func (r Resource) Kind() string {
return r.kind
}
Expand Down
3 changes: 3 additions & 0 deletions go/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type Cluster interface {
// ConfigPath return config path of cluster
ConfigPath() string

// ConfigBytes return config path of cluster
ConfigBytes() []byte

// ListNamespaces attempt to list all namespaces in cluster
ListNamespaces() ([]string, error)

Expand Down

0 comments on commit ed771fa

Please sign in to comment.