Skip to content

Commit

Permalink
refactor: command get (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
wencaiwulue authored May 24, 2024
1 parent 9be029e commit a26abab
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 379 deletions.
61 changes: 53 additions & 8 deletions cmd/kubevpn/cmds/get.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package cmds

import (
"fmt"
"cmp"
"encoding/json"
"os"
"slices"
"strings"

"github.com/spf13/cobra"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/printers"
cmdget "k8s.io/kubectl/pkg/cmd/get"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
"sigs.k8s.io/yaml"

"github.com/wencaiwulue/kubevpn/v2/pkg/daemon"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
)

func CmdGet(f cmdutil.Factory) *cobra.Command {
var printFlags = cmdget.NewGetPrintFlags()
cmd := &cobra.Command{
Use: "get",
Hidden: true,
Expand Down Expand Up @@ -42,7 +49,7 @@ func CmdGet(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
client, err := daemon.GetClient(false).Get(
client, err := daemon.GetClient(true).Get(
cmd.Context(),
&rpc.GetRequest{
Namespace: namespace,
Expand All @@ -52,13 +59,51 @@ func CmdGet(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
marshal, err := yaml.Marshal(client.Metadata)
if err != nil {
return err
w := printers.GetNewTabWriter(os.Stdout)
var toPrinter = func() (printers.ResourcePrinterFunc, error) {
var flags = printFlags.Copy()
_ = flags.EnsureWithNamespace()
printer, err := flags.ToPrinter()
if err != nil {
return nil, err
}
printer, err = printers.NewTypeSetter(scheme.Scheme).WrapToPrinter(printer, nil)
if err != nil {
return nil, err
}
outputOption := cmd.Flags().Lookup("output").Value.String()
if strings.Contains(outputOption, "custom-columns") || outputOption == "yaml" || strings.Contains(outputOption, "json") {
} else {
printer = &cmdget.TablePrinter{Delegate: printer}
}
return printer.PrintObj, nil
}
var list []*v1.PartialObjectMetadata
for _, m := range client.Metadata {
var data v1.PartialObjectMetadata
err = json.Unmarshal([]byte(m), &data)
if err != nil {
continue
}
list = append(list, &data)
}
slices.SortStableFunc(list, func(a, b *v1.PartialObjectMetadata) int {
compare := cmp.Compare(a.GetNamespace(), b.GetNamespace())
if compare == 0 {
return cmp.Compare(a.GetName(), b.GetName())
}
return compare
})
for _, m := range list {
printer, err := toPrinter()
if err != nil {
return err
}
_ = printer.PrintObj(m, w)
}
fmt.Fprint(os.Stdout, string(marshal))
return nil
return w.Flush()
},
}
printFlags.AddFlags(cmd)
return cmd
}
109 changes: 59 additions & 50 deletions pkg/daemon/action/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package action

import (
"context"
"encoding/json"
"errors"
"time"

"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"

"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
)
Expand All @@ -21,16 +25,17 @@ func (svr *Server) Get(ctx context.Context, req *rpc.GetRequest) (*rpc.GetRespon
if svr.connect == nil || svr.connect.Context() == nil {
return nil, errors.New("not connected")
}
if svr.gr == nil {
if svr.resourceLists == nil {
restConfig, err := svr.connect.GetFactory().ToRESTConfig()
if err != nil {
return nil, err
}
restConfig.WarningHandler = rest.NoWarnings{}
config, err := discovery.NewDiscoveryClientForConfig(restConfig)
if err != nil {
return nil, err
}
svr.gr, err = restmapper.GetAPIGroupResources(config)
svr.resourceLists, err = discovery.ServerPreferredResources(config)
if err != nil {
return nil, err
}
Expand All @@ -42,73 +47,77 @@ func (svr *Server) Get(ctx context.Context, req *rpc.GetRequest) (*rpc.GetRespon
if err != nil {
return nil, err
}

svr.informer = metadatainformer.NewSharedInformerFactory(forConfig, time.Second*5)
for _, resources := range svr.gr {
for _, apiResources := range resources.VersionedResources {
for _, resource := range apiResources {
have := sets.New[string](resource.Kind, resource.Name, resource.SingularName).Insert(resource.ShortNames...).Has(req.Resource)
if have {
resourcesFor, err := mapper.RESTMapping(schema.GroupKind{
Group: resource.Group,
Kind: resource.Kind,
}, resource.Version)
if err != nil {
return nil, err
}
svr.informer.ForResource(resourcesFor.Resource)
for _, resourceList := range svr.resourceLists {
for _, resource := range resourceList.APIResources {
var groupVersion schema.GroupVersion
groupVersion, err = schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
continue
}
var mapping schema.GroupVersionResource
mapping, err = mapper.ResourceFor(groupVersion.WithResource(resource.Name))
if err != nil {
if meta.IsNoMatchError(err) {
continue
}
return nil, err
}
_ = svr.informer.ForResource(mapping).Informer().SetWatchErrorHandler(func(r *cache.Reflector, err error) {
_, _ = svr.LogFile.Write([]byte(err.Error()))
})
}
}
go svr.informer.Start(svr.connect.Context().Done())
go svr.informer.WaitForCacheSync(make(chan struct{}))
svr.informer.Start(svr.connect.Context().Done())
svr.informer.WaitForCacheSync(ctx.Done())
}
informer, err := svr.getInformer(req)
informer, gvk, err := svr.getInformer(req)
if err != nil {
return nil, err
}
var result []*rpc.Metadata
for _, m := range informer.Informer().GetIndexer().List() {
object, err := meta.Accessor(m)
if err != nil {
return nil, err
var result []string
for _, m := range informer.Informer().GetStore().List() {
objectMetadata, ok := m.(*v1.PartialObjectMetadata)
if ok {
deepCopy := objectMetadata.DeepCopy()
deepCopy.SetGroupVersionKind(*gvk)
deepCopy.ManagedFields = nil
marshal, err := json.Marshal(deepCopy)
if err != nil {
continue
}
result = append(result, string(marshal))
}
result = append(result, &rpc.Metadata{
Name: object.GetName(),
Namespace: object.GetNamespace(),
})
}

return &rpc.GetResponse{Metadata: result}, nil
}

func (svr *Server) getInformer(req *rpc.GetRequest) (informers.GenericInformer, error) {
func (svr *Server) getInformer(req *rpc.GetRequest) (informers.GenericInformer, *schema.GroupVersionKind, error) {
mapper, err := svr.connect.GetFactory().ToRESTMapper()
if err != nil {
return nil, err
return nil, nil, err
}
var resourcesFor *meta.RESTMapping
out:
for _, resources := range svr.gr {
for _, apiResources := range resources.VersionedResources {
for _, resource := range apiResources {
have := sets.New[string](resource.Kind, resource.Name, resource.SingularName).Insert(resource.ShortNames...).Has(req.Resource)
if have {
resourcesFor, err = mapper.RESTMapping(schema.GroupKind{
Group: resource.Group,
Kind: resource.Kind,
}, resource.Version)
if err != nil {
return nil, err
for _, resources := range svr.resourceLists {
for _, resource := range resources.APIResources {
have := sets.New[string](resource.Kind, resource.Name, resource.SingularName).Insert(resource.ShortNames...).Has(req.Resource)
if have {
var groupVersion schema.GroupVersion
groupVersion, err = schema.ParseGroupVersion(resources.GroupVersion)
if err != nil {
continue
}
var mapping schema.GroupVersionResource
mapping, err = mapper.ResourceFor(groupVersion.WithResource(resource.Name))
if err != nil {
if meta.IsNoMatchError(err) {
continue
}
break out
return nil, nil, err
}
return svr.informer.ForResource(mapping), ptr.To(groupVersion.WithKind(resource.Kind)), nil
}
}
}
if resourcesFor == nil {
return nil, errors.New("ErrResourceNotFound")
}

return svr.informer.ForResource(resourcesFor.Resource), nil
return nil, nil, errors.New("ErrResourceNotFound")
}
6 changes: 3 additions & 3 deletions pkg/daemon/action/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"time"

"gopkg.in/natefinch/lumberjack.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/restmapper"

"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
Expand All @@ -28,8 +28,8 @@ type Server struct {
clone *handler.CloneOptions
secondaryConnect []*handler.ConnectOptions

gr []*restmapper.APIGroupResources
informer metadatainformer.SharedInformerFactory
resourceLists []*metav1.APIResourceList
informer metadatainformer.SharedInformerFactory

ID string
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/action"
_ "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/handler"
Expand Down Expand Up @@ -49,6 +51,9 @@ func (o *SvrOption) Start(ctx context.Context) error {
}
util.InitLogger(true)
log.SetOutput(l)
klog.SetOutput(l)
klog.LogToStderr(false)
rest.SetDefaultWarningHandler(rest.NoWarnings{})
// every day 00:00:00 rotate log
if !o.IsSudo {
go func() {
Expand Down
Loading

0 comments on commit a26abab

Please sign in to comment.