From 16166c10cb75bf013d0b6f2522de65cbe64ab785 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Fri, 22 Apr 2022 19:17:45 +0530 Subject: [PATCH 01/13] Lvm volumes controller refactor:abhilashshetty04/lvm-driver-dev:4.5 Signed-off-by: Abhilash Shetty --- Makefile | 2 +- Makefile.buildx.mk | 4 +- go.sum | 12 -- pkg/mgmt/lvmnode/builder.go | 5 + pkg/mgmt/lvmnode/start.go | 10 + pkg/mgmt/volume/builder.go | 53 +++++- pkg/mgmt/volume/start.go | 44 ++--- pkg/mgmt/volume/volume.go | 172 +++++++++++------- .../dynamic/dynamicinformer/informer.go | 158 ++++++++++++++++ .../dynamic/dynamicinformer/interface.go | 34 ++++ .../dynamic/dynamiclister/interface.go | 40 ++++ .../client-go/dynamic/dynamiclister/lister.go | 91 +++++++++ .../client-go/dynamic/dynamiclister/shim.go | 87 +++++++++ vendor/modules.txt | 2 + 14 files changed, 606 insertions(+), 108 deletions(-) create mode 100644 vendor/k8s.io/client-go/dynamic/dynamicinformer/informer.go create mode 100644 vendor/k8s.io/client-go/dynamic/dynamicinformer/interface.go create mode 100644 vendor/k8s.io/client-go/dynamic/dynamiclister/interface.go create mode 100644 vendor/k8s.io/client-go/dynamic/dynamiclister/lister.go create mode 100644 vendor/k8s.io/client-go/dynamic/dynamiclister/shim.go diff --git a/Makefile b/Makefile index 0c33fb8c..86c0dcf2 100644 --- a/Makefile +++ b/Makefile @@ -49,7 +49,7 @@ EXTERNAL_TOOLS=\ # By default the organization name is `openebs`. ifeq (${IMAGE_ORG}, ) - IMAGE_ORG="openebs" + IMAGE_ORG="abhilashshetty04" export IMAGE_ORG endif diff --git a/Makefile.buildx.mk b/Makefile.buildx.mk index d33ce6d3..adae6ad6 100644 --- a/Makefile.buildx.mk +++ b/Makefile.buildx.mk @@ -23,7 +23,7 @@ endif # default list of platforms for which multiarch image is built ifeq (${PLATFORMS}, ) - export PLATFORMS="linux/amd64,linux/arm64" + export PLATFORMS="linux/amd64" endif # if IMG_RESULT is unspecified, by default the image will be pushed to registry @@ -39,7 +39,7 @@ else endif # Name of the multiarch image for csi-driver -DOCKERX_IMAGE_CSI_DRIVER:=${IMAGE_ORG}/lvm-driver:${TAG} +DOCKERX_IMAGE_CSI_DRIVER:=${IMAGE_ORG}/lvm-driver-dev:${TAG} .PHONY: docker.buildx docker.buildx: diff --git a/go.sum b/go.sum index 1f3db16f..7e525dce 100644 --- a/go.sum +++ b/go.sum @@ -42,11 +42,9 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -178,7 +176,6 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -249,7 +246,6 @@ github.com/jpillora/go-ogle-analytics v0.0.0-20161213085824-14b04e0594ef/go.mod github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -262,7 +258,6 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -335,7 +330,6 @@ github.com/prometheus/client_golang v0.9.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= -github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= @@ -348,7 +342,6 @@ github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7q github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= -github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= @@ -357,7 +350,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= @@ -370,7 +362,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= @@ -539,7 +530,6 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201112073958-5cba982894dd h1:5CtCZbICpIOFdgO940moixOPjc0178IU44m4EjOO5IY= golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q= @@ -652,7 +642,6 @@ google.golang.org/grpc v1.34.2 h1:gpVtRHX/KWxFrkm5GsqBJ2LHSYVcjjezFU1VsRzgrRU= google.golang.org/grpc v1.34.2/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -675,7 +664,6 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go index 33ec1415..b9e78354 100644 --- a/pkg/mgmt/lvmnode/builder.go +++ b/pkg/mgmt/lvmnode/builder.go @@ -67,6 +67,11 @@ type NodeController struct { ownerRef metav1.OwnerReference } +/*func newNodeController(kubeClient kubernetes.Interface, client dynamic.Interface, + dynInformer dynamicinformer.DynamicSharedInformerFactory) *NodeController { + +}*/ + // NodeControllerBuilder is the builder object for controller. type NodeControllerBuilder struct { NodeController *NodeController diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go index 7feacf3c..978bccfd 100644 --- a/pkg/mgmt/lvmnode/start.go +++ b/pkg/mgmt/lvmnode/start.go @@ -53,6 +53,16 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { return errors.Wrap(err, "error building openebs clientset") } + /*openebsClientNew, err := dynamic.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building dynamic client for lvmnode cr") + } + + nodeInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(openebsClientNew, 5*time.Minute, + lvm.LvmNamespace, func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() + })*/ + // setup watch only on node we are interested in. nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions( openebsClient, 0, informers.WithNamespace(lvm.LvmNamespace), diff --git a/pkg/mgmt/volume/builder.go b/pkg/mgmt/volume/builder.go index a4a96255..607e7bf1 100644 --- a/pkg/mgmt/volume/builder.go +++ b/pkg/mgmt/volume/builder.go @@ -17,11 +17,11 @@ limitations under the License. package volume import ( - clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" - openebsScheme "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset/scheme" - informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" - listers "github.com/openebs/lvm-localpv/pkg/generated/lister/lvm/v1alpha1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/dynamic/dynamiclister" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -33,15 +33,23 @@ import ( const controllerAgentName = "lvmvolume-controller" +var resource = schema.GroupVersionResource{ + Group: "local.openebs.io", + Version: "v1alpha1", + Resource: "lvmvolumes", +} + // VolController is the controller implementation for volume resources type VolController struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface // clientset is a openebs custom resource package generated for custom API group. - clientset clientset.Interface + //clientset clientset.Interface + clientset dynamic.Interface - VolLister listers.LVMVolumeLister + //VolLister listers.LVMVolumeLister + VolLister dynamiclister.Lister // VolSynced is used for caches sync to get populated VolSynced cache.InformerSynced @@ -58,8 +66,37 @@ type VolController struct { recorder record.EventRecorder } +func newVolController(kubeClient kubernetes.Interface, client dynamic.Interface, + dynInformer dynamicinformer.DynamicSharedInformerFactory) *VolController { + volInformer := dynInformer.ForResource(resource).Informer() + + klog.Infof("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + klog.Infof("Creating lvm volume controller object") + volCtrller := &VolController{ + kubeclientset: kubeClient, + clientset: client, + VolLister: dynamiclister.New(volInformer.GetIndexer(), resource), + VolSynced: volInformer.HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Vol"), + recorder: recorder, + } + klog.Infof("Adding Event handler functions for lvm volume controller") + volInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: volCtrller.addVol, + DeleteFunc: volCtrller.deleteVol, + UpdateFunc: volCtrller.updateVol, + }) + + return volCtrller +} + // VolControllerBuilder is the builder object for controller. -type VolControllerBuilder struct { +/*type VolControllerBuilder struct { VolController *VolController } @@ -132,4 +169,4 @@ func (cb *VolControllerBuilder) Build() (*VolController, error) { return nil, err } return cb.VolController, nil -} +}*/ diff --git a/pkg/mgmt/volume/start.go b/pkg/mgmt/volume/start.go index 3a64de8e..811d32a4 100644 --- a/pkg/mgmt/volume/start.go +++ b/pkg/mgmt/volume/start.go @@ -23,9 +23,10 @@ import ( "time" - clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" - informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" - kubeinformers "k8s.io/client-go/informers" + //informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + //kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -51,14 +52,15 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { return errors.Wrap(err, "error building kubernetes clientset") } - // Building OpenEBS Clientset - openebsClient, err := clientset.NewForConfig(cfg) + // Building dynamic Client to watch lvmvolume cr + //openebsClient, err := clientset.NewForConfig(cfg) + openebsClient, err := dynamic.NewForConfig(cfg) if err != nil { - return errors.Wrap(err, "error building openebs clientset") + return errors.Wrap(err, "error building dynamic client for lvmvolume cr") } - kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) - VolInformerFactory := informers.NewSharedInformerFactory(openebsClient, time.Second*30) + //kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) + VolInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(openebsClient, 5*time.Minute) // Build() fn of all controllers calls AddToScheme to adds all types of this // clientset into the given scheme. // If multiple controllers happen to call this AddToScheme same time, @@ -66,25 +68,23 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { // This lock is used to serialize the AddToScheme call of all controllers. controllerMtx.Lock() - controller, err := NewVolControllerBuilder(). - withKubeClient(kubeClient). - withOpenEBSClient(openebsClient). - withVolSynced(VolInformerFactory). - withVolLister(VolInformerFactory). - withRecorder(kubeClient). - withEventHandler(VolInformerFactory). - withWorkqueueRateLimiting().Build() + /*controller, err := NewVolControllerBuilder(). + withKubeClient(kubeClient). + withOpenEBSClient(openebsClient). + withVolSynced(VolInformerFactory). + withVolLister(VolInformerFactory). + withRecorder(kubeClient). + withEventHandler(VolInformerFactory). + withWorkqueueRateLimiting().Build()*/ + controller := newVolController(kubeClient, openebsClient, VolInformerFactory) // blocking call, can't use defer to release the lock controllerMtx.Unlock() - if err != nil { - return errors.Wrapf(err, "error building controller instance") - } - - go kubeInformerFactory.Start(stopCh) + klog.Info("Starting informer for lvm volume controller") + //go kubeInformerFactory.Start(stopCh) go VolInformerFactory.Start(stopCh) - + klog.Info("Starting Lvm volume controller") // Threadiness defines the number of workers to be launched in Run function return controller.Run(2, stopCh) } diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 4515c253..ce3adf25 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -18,6 +18,7 @@ package volume import ( "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "regexp" "sort" "strconv" @@ -27,6 +28,7 @@ import ( apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" "github.com/openebs/lvm-localpv/pkg/lvm" k8serror "k8s.io/apimachinery/pkg/api/errors" + runtimenew "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -49,7 +51,8 @@ func (c *VolController) syncHandler(key string) error { } // Get the Vol resource with this namespace/name - Vol, err := c.VolLister.LVMVolumes(namespace).Get(name) + klog.Infof("Getting lvmvol object name:%s, ns:%s from cache\n", name, namespace) + unstructuredVol, err := c.VolLister.Namespace(namespace).Get(name) if k8serror.IsNotFound(err) { runtime.HandleError(fmt.Errorf("lvmvolume '%s' has been deleted", key)) return nil @@ -57,23 +60,123 @@ func (c *VolController) syncHandler(key string) error { if err != nil { return err } - VolCopy := Vol.DeepCopy() + vol := &apis.LVMVolume{} + err = runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredVol.UnstructuredContent(), &vol) + //err = runtime.DefaultUnstructuredConverter.FromUnstructured(Vol.UnstructuredContent(), &vol) + if err != nil { + fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + } + VolCopy := vol.DeepCopy() err = c.syncVol(VolCopy) return err } +// addVol is the add event handler for LVMVolume +func (c *VolController) addVol(obj interface{}) { + //Vol, ok := obj.(*apis.LVMVolume) + klog.Infoln("Add was called") + Vol, ok := c.getStructuredObject(obj) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get Vol object %#v", obj)) + return + } + + if lvm.NodeID != Vol.Spec.OwnerNodeID { + return + } + klog.Infof("Got add event for Vol %s", Vol.Name) + c.enqueueVol(Vol) +} + +// updateVol is the update event handler for LVMVolume +func (c *VolController) updateVol(oldObj, newObj interface{}) { + klog.Infoln("Update was called") + //newVol, ok := newObj.(*apis.LVMVolume) + newVol, ok := c.getStructuredObject(newObj) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get Vol object %#v", newVol)) + return + } + + if lvm.NodeID != newVol.Spec.OwnerNodeID { + return + } + + if c.isDeletionCandidate(newVol) { + klog.Infof("Got update event for deleted Vol %s", newVol.Name) + klog.Infof("Deletion timestamp for the volume from UpdateVOl: %v", newVol.ObjectMeta.DeletionTimestamp) + c.enqueueVol(newVol) + } +} + +// deleteVol is the delete event handler for LVMVolume +func (c *VolController) deleteVol(obj interface{}) { + //Vol, ok := obj.(*apis.LVMVolume) + klog.Infoln("Delete was called") + Vol, ok := c.getStructuredObject(obj) + /*if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + Vol, ok = tombstone.Obj.(*apis.LVMVolume) + if !ok { + runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a lvmvolume %#v", obj)) + return + } + }*/ + klog.Infof("structured obj from delete event is vol: %v, ok: %v ", Vol, ok) + if !ok { + unstructuredObj, ok := obj.(*unstructured.Unstructured) + if ok { + tombStone := cache.DeletedFinalStateUnknown{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) + if err != nil { + runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + Vol, ok = tombStone.Obj.(*apis.LVMVolume) + if !ok { + runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a lvmvolume %#v", obj)) + return + } + } + } + + if lvm.NodeID != Vol.Spec.OwnerNodeID { + return + } + + klog.Infof("Got delete event for Vol %s", Vol.Name) + c.enqueueVol(Vol) +} + // enqueueVol takes a LVMVolume resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than LVMVolume. func (c *VolController) enqueueVol(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { runtime.HandleError(err) return } c.workqueue.Add(key) +} +func (c *VolController) getStructuredObject(obj interface{}) (*apis.LVMVolume, bool) { + unstructuredInterface, ok := obj.(*unstructured.Unstructured) + if ok { + vol := &apis.LVMVolume{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &vol) + if err != nil { + fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + return nil, false + } + fmt.Println("Object from Informer cache: ", vol) + return vol, true + } + return nil, false } // synVol is the function which tries to converge to a desired state for the @@ -88,6 +191,7 @@ func (c *VolController) syncVol(vol *apis.LVMVolume) error { } return err } + // if status is Pending then it means we are creating the volume. // Otherwise, we are just ignoring the event. switch vol.Status.State { @@ -195,64 +299,6 @@ func (c *VolController) transformLVMError(err error) *apis.VolumeError { return volErr } -// addVol is the add event handler for LVMVolume -func (c *VolController) addVol(obj interface{}) { - Vol, ok := obj.(*apis.LVMVolume) - if !ok { - runtime.HandleError(fmt.Errorf("Couldn't get Vol object %#v", obj)) - return - } - - if lvm.NodeID != Vol.Spec.OwnerNodeID { - return - } - klog.Infof("Got add event for Vol %s", Vol.Name) - c.enqueueVol(Vol) -} - -// updateVol is the update event handler for LVMVolume -func (c *VolController) updateVol(oldObj, newObj interface{}) { - - newVol, ok := newObj.(*apis.LVMVolume) - if !ok { - runtime.HandleError(fmt.Errorf("Couldn't get Vol object %#v", newVol)) - return - } - - if lvm.NodeID != newVol.Spec.OwnerNodeID { - return - } - - if c.isDeletionCandidate(newVol) { - klog.Infof("Got update event for deleted Vol %s", newVol.Name) - c.enqueueVol(newVol) - } -} - -// deleteVol is the delete event handler for LVMVolume -func (c *VolController) deleteVol(obj interface{}) { - Vol, ok := obj.(*apis.LVMVolume) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) - return - } - Vol, ok = tombstone.Obj.(*apis.LVMVolume) - if !ok { - runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a lvmvolume %#v", obj)) - return - } - } - - if lvm.NodeID != Vol.Spec.OwnerNodeID { - return - } - - klog.Infof("Got delete event for Vol %s", Vol.Name) - c.enqueueVol(Vol) -} - // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for diff --git a/vendor/k8s.io/client-go/dynamic/dynamicinformer/informer.go b/vendor/k8s.io/client-go/dynamic/dynamicinformer/informer.go new file mode 100644 index 00000000..40878b40 --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/dynamicinformer/informer.go @@ -0,0 +1,158 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicinformer + +import ( + "context" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamiclister" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +// NewDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory for all namespaces. +func NewDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration) DynamicSharedInformerFactory { + return NewFilteredDynamicSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil) +} + +// NewFilteredDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory. +// Listers obtained via this factory will be subject to the same filters as specified here. +func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) DynamicSharedInformerFactory { + return &dynamicSharedInformerFactory{ + client: client, + defaultResync: defaultResync, + namespace: namespace, + informers: map[schema.GroupVersionResource]informers.GenericInformer{}, + startedInformers: make(map[schema.GroupVersionResource]bool), + tweakListOptions: tweakListOptions, + } +} + +type dynamicSharedInformerFactory struct { + client dynamic.Interface + defaultResync time.Duration + namespace string + + lock sync.Mutex + informers map[schema.GroupVersionResource]informers.GenericInformer + // startedInformers is used for tracking which informers have been started. + // This allows Start() to be called multiple times safely. + startedInformers map[schema.GroupVersionResource]bool + tweakListOptions TweakListOptionsFunc +} + +var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{} + +func (f *dynamicSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer { + f.lock.Lock() + defer f.lock.Unlock() + + key := gvr + informer, exists := f.informers[key] + if exists { + return informer + } + + informer = NewFilteredDynamicInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) + f.informers[key] = informer + + return informer +} + +// Start initializes all requested informers. +func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) { + f.lock.Lock() + defer f.lock.Unlock() + + for informerType, informer := range f.informers { + if !f.startedInformers[informerType] { + go informer.Informer().Run(stopCh) + f.startedInformers[informerType] = true + } + } +} + +// WaitForCacheSync waits for all started informers' cache were synced. +func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool { + informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{} + for informerType, informer := range f.informers { + if f.startedInformers[informerType] { + informers[informerType] = informer.Informer() + } + } + return informers + }() + + res := map[schema.GroupVersionResource]bool{} + for informType, informer := range informers { + res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) + } + return res +} + +// NewFilteredDynamicInformer constructs a new informer for a dynamic type. +func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer { + return &dynamicInformer{ + gvr: gvr, + informer: cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options) + }, + }, + &unstructured.Unstructured{}, + resyncPeriod, + indexers, + ), + } +} + +type dynamicInformer struct { + informer cache.SharedIndexInformer + gvr schema.GroupVersionResource +} + +var _ informers.GenericInformer = &dynamicInformer{} + +func (d *dynamicInformer) Informer() cache.SharedIndexInformer { + return d.informer +} + +func (d *dynamicInformer) Lister() cache.GenericLister { + return dynamiclister.NewRuntimeObjectShim(dynamiclister.New(d.informer.GetIndexer(), d.gvr)) +} diff --git a/vendor/k8s.io/client-go/dynamic/dynamicinformer/interface.go b/vendor/k8s.io/client-go/dynamic/dynamicinformer/interface.go new file mode 100644 index 00000000..083977c3 --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/dynamicinformer/interface.go @@ -0,0 +1,34 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicinformer + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" +) + +// DynamicSharedInformerFactory provides access to a shared informer and lister for dynamic client +type DynamicSharedInformerFactory interface { + Start(stopCh <-chan struct{}) + ForResource(gvr schema.GroupVersionResource) informers.GenericInformer + WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool +} + +// TweakListOptionsFunc defines the signature of a helper function +// that wants to provide more listing options to API +type TweakListOptionsFunc func(*metav1.ListOptions) diff --git a/vendor/k8s.io/client-go/dynamic/dynamiclister/interface.go b/vendor/k8s.io/client-go/dynamic/dynamiclister/interface.go new file mode 100644 index 00000000..c39cbee9 --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/dynamiclister/interface.go @@ -0,0 +1,40 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamiclister + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" +) + +// Lister helps list resources. +type Lister interface { + // List lists all resources in the indexer. + List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) + // Get retrieves a resource from the indexer with the given name + Get(name string) (*unstructured.Unstructured, error) + // Namespace returns an object that can list and get resources in a given namespace. + Namespace(namespace string) NamespaceLister +} + +// NamespaceLister helps list and get resources. +type NamespaceLister interface { + // List lists all resources in the indexer for a given namespace. + List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) + // Get retrieves a resource from the indexer for a given namespace and name. + Get(name string) (*unstructured.Unstructured, error) +} diff --git a/vendor/k8s.io/client-go/dynamic/dynamiclister/lister.go b/vendor/k8s.io/client-go/dynamic/dynamiclister/lister.go new file mode 100644 index 00000000..a50fc471 --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/dynamiclister/lister.go @@ -0,0 +1,91 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamiclister + +import ( + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +var _ Lister = &dynamicLister{} +var _ NamespaceLister = &dynamicNamespaceLister{} + +// dynamicLister implements the Lister interface. +type dynamicLister struct { + indexer cache.Indexer + gvr schema.GroupVersionResource +} + +// New returns a new Lister. +func New(indexer cache.Indexer, gvr schema.GroupVersionResource) Lister { + return &dynamicLister{indexer: indexer, gvr: gvr} +} + +// List lists all resources in the indexer. +func (l *dynamicLister) List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) { + err = cache.ListAll(l.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*unstructured.Unstructured)) + }) + return ret, err +} + +// Get retrieves a resource from the indexer with the given name +func (l *dynamicLister) Get(name string) (*unstructured.Unstructured, error) { + obj, exists, err := l.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(l.gvr.GroupResource(), name) + } + return obj.(*unstructured.Unstructured), nil +} + +// Namespace returns an object that can list and get resources from a given namespace. +func (l *dynamicLister) Namespace(namespace string) NamespaceLister { + return &dynamicNamespaceLister{indexer: l.indexer, namespace: namespace, gvr: l.gvr} +} + +// dynamicNamespaceLister implements the NamespaceLister interface. +type dynamicNamespaceLister struct { + indexer cache.Indexer + namespace string + gvr schema.GroupVersionResource +} + +// List lists all resources in the indexer for a given namespace. +func (l *dynamicNamespaceLister) List(selector labels.Selector) (ret []*unstructured.Unstructured, err error) { + err = cache.ListAllByNamespace(l.indexer, l.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*unstructured.Unstructured)) + }) + return ret, err +} + +// Get retrieves a resource from the indexer for a given namespace and name. +func (l *dynamicNamespaceLister) Get(name string) (*unstructured.Unstructured, error) { + obj, exists, err := l.indexer.GetByKey(l.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(l.gvr.GroupResource(), name) + } + return obj.(*unstructured.Unstructured), nil +} diff --git a/vendor/k8s.io/client-go/dynamic/dynamiclister/shim.go b/vendor/k8s.io/client-go/dynamic/dynamiclister/shim.go new file mode 100644 index 00000000..92a5f54a --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/dynamiclister/shim.go @@ -0,0 +1,87 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamiclister + +import ( + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var _ cache.GenericLister = &dynamicListerShim{} +var _ cache.GenericNamespaceLister = &dynamicNamespaceListerShim{} + +// dynamicListerShim implements the cache.GenericLister interface. +type dynamicListerShim struct { + lister Lister +} + +// NewRuntimeObjectShim returns a new shim for Lister. +// It wraps Lister so that it implements cache.GenericLister interface +func NewRuntimeObjectShim(lister Lister) cache.GenericLister { + return &dynamicListerShim{lister: lister} +} + +// List will return all objects across namespaces +func (s *dynamicListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { + objs, err := s.lister.List(selector) + if err != nil { + return nil, err + } + + ret = make([]runtime.Object, len(objs)) + for index, obj := range objs { + ret[index] = obj + } + return ret, err +} + +// Get will attempt to retrieve assuming that name==key +func (s *dynamicListerShim) Get(name string) (runtime.Object, error) { + return s.lister.Get(name) +} + +func (s *dynamicListerShim) ByNamespace(namespace string) cache.GenericNamespaceLister { + return &dynamicNamespaceListerShim{ + namespaceLister: s.lister.Namespace(namespace), + } +} + +// dynamicNamespaceListerShim implements the NamespaceLister interface. +// It wraps NamespaceLister so that it implements cache.GenericNamespaceLister interface +type dynamicNamespaceListerShim struct { + namespaceLister NamespaceLister +} + +// List will return all objects in this namespace +func (ns *dynamicNamespaceListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) { + objs, err := ns.namespaceLister.List(selector) + if err != nil { + return nil, err + } + + ret = make([]runtime.Object, len(objs)) + for index, obj := range objs { + ret[index] = obj + } + return ret, err +} + +// Get will attempt to retrieve by namespace and name +func (ns *dynamicNamespaceListerShim) Get(name string) (runtime.Object, error) { + return ns.namespaceLister.Get(name) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 94be1805..1434129b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -438,6 +438,8 @@ k8s.io/apimachinery/third_party/forked/golang/reflect k8s.io/client-go/discovery k8s.io/client-go/discovery/fake k8s.io/client-go/dynamic +k8s.io/client-go/dynamic/dynamicinformer +k8s.io/client-go/dynamic/dynamiclister k8s.io/client-go/informers k8s.io/client-go/informers/admissionregistration k8s.io/client-go/informers/admissionregistration/v1 From 5a8cbeadd6f562f5c6651915bf571ced149788a7 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Fri, 22 Apr 2022 19:36:55 +0530 Subject: [PATCH 02/13] Removed older implementation --- pkg/mgmt/volume/builder.go | 76 -------------------------------------- pkg/mgmt/volume/start.go | 12 +----- pkg/mgmt/volume/volume.go | 12 ------ 3 files changed, 1 insertion(+), 99 deletions(-) diff --git a/pkg/mgmt/volume/builder.go b/pkg/mgmt/volume/builder.go index 607e7bf1..853d3d60 100644 --- a/pkg/mgmt/volume/builder.go +++ b/pkg/mgmt/volume/builder.go @@ -94,79 +94,3 @@ func newVolController(kubeClient kubernetes.Interface, client dynamic.Interface, return volCtrller } - -// VolControllerBuilder is the builder object for controller. -/*type VolControllerBuilder struct { - VolController *VolController -} - -// NewVolControllerBuilder returns an empty instance of controller builder. -func NewVolControllerBuilder() *VolControllerBuilder { - return &VolControllerBuilder{ - VolController: &VolController{}, - } -} - -// withKubeClient fills kube client to controller object. -func (cb *VolControllerBuilder) withKubeClient(ks kubernetes.Interface) *VolControllerBuilder { - cb.VolController.kubeclientset = ks - return cb -} - -// withOpenEBSClient fills openebs client to controller object. -func (cb *VolControllerBuilder) withOpenEBSClient(cs clientset.Interface) *VolControllerBuilder { - cb.VolController.clientset = cs - return cb -} - -// withVolLister fills Vol lister to controller object. -func (cb *VolControllerBuilder) withVolLister(sl informers.SharedInformerFactory) *VolControllerBuilder { - VolInformer := sl.Local().V1alpha1().LVMVolumes() - cb.VolController.VolLister = VolInformer.Lister() - return cb -} - -// withVolSynced adds object sync information in cache to controller object. -func (cb *VolControllerBuilder) withVolSynced(sl informers.SharedInformerFactory) *VolControllerBuilder { - VolInformer := sl.Local().V1alpha1().LVMVolumes() - cb.VolController.VolSynced = VolInformer.Informer().HasSynced - return cb -} - -// withWorkqueue adds workqueue to controller object. -func (cb *VolControllerBuilder) withWorkqueueRateLimiting() *VolControllerBuilder { - cb.VolController.workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Vol") - return cb -} - -// withRecorder adds recorder to controller object. -func (cb *VolControllerBuilder) withRecorder(ks kubernetes.Interface) *VolControllerBuilder { - klog.Infof("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ks.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - cb.VolController.recorder = recorder - return cb -} - -// withEventHandler adds event handlers controller object. -func (cb *VolControllerBuilder) withEventHandler(cvcInformerFactory informers.SharedInformerFactory) *VolControllerBuilder { - cvcInformer := cvcInformerFactory.Local().V1alpha1().LVMVolumes() - // Set up an event handler for when Vol resources change - cvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cb.VolController.addVol, - UpdateFunc: cb.VolController.updateVol, - DeleteFunc: cb.VolController.deleteVol, - }) - return cb -} - -// Build returns a controller instance. -func (cb *VolControllerBuilder) Build() (*VolController, error) { - err := openebsScheme.AddToScheme(scheme.Scheme) - if err != nil { - return nil, err - } - return cb.VolController, nil -}*/ diff --git a/pkg/mgmt/volume/start.go b/pkg/mgmt/volume/start.go index 811d32a4..521ca8c8 100644 --- a/pkg/mgmt/volume/start.go +++ b/pkg/mgmt/volume/start.go @@ -59,7 +59,6 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { return errors.Wrap(err, "error building dynamic client for lvmvolume cr") } - //kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) VolInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(openebsClient, 5*time.Minute) // Build() fn of all controllers calls AddToScheme to adds all types of this // clientset into the given scheme. @@ -68,21 +67,12 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { // This lock is used to serialize the AddToScheme call of all controllers. controllerMtx.Lock() - /*controller, err := NewVolControllerBuilder(). - withKubeClient(kubeClient). - withOpenEBSClient(openebsClient). - withVolSynced(VolInformerFactory). - withVolLister(VolInformerFactory). - withRecorder(kubeClient). - withEventHandler(VolInformerFactory). - withWorkqueueRateLimiting().Build()*/ - + //Build Lvm volume controller controller := newVolController(kubeClient, openebsClient, VolInformerFactory) // blocking call, can't use defer to release the lock controllerMtx.Unlock() klog.Info("Starting informer for lvm volume controller") - //go kubeInformerFactory.Start(stopCh) go VolInformerFactory.Start(stopCh) klog.Info("Starting Lvm volume controller") // Threadiness defines the number of workers to be launched in Run function diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index ce3adf25..0deead11 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -114,18 +114,6 @@ func (c *VolController) deleteVol(obj interface{}) { //Vol, ok := obj.(*apis.LVMVolume) klog.Infoln("Delete was called") Vol, ok := c.getStructuredObject(obj) - /*if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) - return - } - Vol, ok = tombstone.Obj.(*apis.LVMVolume) - if !ok { - runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a lvmvolume %#v", obj)) - return - } - }*/ klog.Infof("structured obj from delete event is vol: %v, ok: %v ", Vol, ok) if !ok { unstructuredObj, ok := obj.(*unstructured.Unstructured) From 29aabc2800ccc333efb84d0d8f120fd7de854c4d Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Thu, 28 Apr 2022 17:53:54 +0530 Subject: [PATCH 03/13] refactored code for lvmnode and lvmsnapshot controller in node plugin Signed-off-by: Abhilash Shetty --- Makefile | 2 +- Makefile.buildx.mk | 2 +- pkg/mgmt/lvmnode/builder.go | 129 +++++++++------------------------- pkg/mgmt/lvmnode/lvmnode.go | 54 ++++++++++---- pkg/mgmt/lvmnode/start.go | 57 ++++++++------- pkg/mgmt/snapshot/builder.go | 109 +++++++++------------------- pkg/mgmt/snapshot/snapshot.go | 44 +++++++++--- pkg/mgmt/snapshot/start.go | 33 +++------ pkg/mgmt/volume/builder.go | 7 +- pkg/mgmt/volume/start.go | 11 +-- pkg/mgmt/volume/volume.go | 15 ++-- 11 files changed, 191 insertions(+), 272 deletions(-) diff --git a/Makefile b/Makefile index 86c0dcf2..0c33fb8c 100644 --- a/Makefile +++ b/Makefile @@ -49,7 +49,7 @@ EXTERNAL_TOOLS=\ # By default the organization name is `openebs`. ifeq (${IMAGE_ORG}, ) - IMAGE_ORG="abhilashshetty04" + IMAGE_ORG="openebs" export IMAGE_ORG endif diff --git a/Makefile.buildx.mk b/Makefile.buildx.mk index adae6ad6..da99637a 100644 --- a/Makefile.buildx.mk +++ b/Makefile.buildx.mk @@ -39,7 +39,7 @@ else endif # Name of the multiarch image for csi-driver -DOCKERX_IMAGE_CSI_DRIVER:=${IMAGE_ORG}/lvm-driver-dev:${TAG} +DOCKERX_IMAGE_CSI_DRIVER:=${IMAGE_ORG}/lvm-driver:${TAG} .PHONY: docker.buildx docker.buildx: diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go index b9e78354..a362120a 100644 --- a/pkg/mgmt/lvmnode/builder.go +++ b/pkg/mgmt/lvmnode/builder.go @@ -17,12 +17,12 @@ package lvmnode import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/dynamic/dynamiclister" "time" - clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" - openebsScheme "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset/scheme" - informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" - listers "github.com/openebs/lvm-localpv/pkg/generated/lister/lvm/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -36,15 +36,23 @@ import ( const controllerAgentName = "lvmnode-controller" +var resource = schema.GroupVersionResource{ + Group: "local.openebs.io", + Version: "v1alpha1", + Resource: "lvmnodes", +} + // NodeController is the controller implementation for lvm node resources + type NodeController struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface - // clientset is a openebs custom resource package generated for custom API group. - clientset clientset.Interface + // clientset is a interface which will be used to list lvmnode from Api server + clientset dynamic.Interface - NodeLister listers.LVMNodeLister + //NodeLister is used to list lvmnode from informer cache + NodeLister dynamiclister.Lister // NodeSynced is used for caches sync to get populated NodeSynced cache.InformerSynced @@ -67,96 +75,29 @@ type NodeController struct { ownerRef metav1.OwnerReference } -/*func newNodeController(kubeClient kubernetes.Interface, client dynamic.Interface, - dynInformer dynamicinformer.DynamicSharedInformerFactory) *NodeController { - -}*/ - -// NodeControllerBuilder is the builder object for controller. -type NodeControllerBuilder struct { - NodeController *NodeController -} - -// NewNodeControllerBuilder returns an empty instance of controller builder. -func NewNodeControllerBuilder() *NodeControllerBuilder { - return &NodeControllerBuilder{ - NodeController: &NodeController{}, - } -} - -// withKubeClient fills kube client to controller object. -func (cb *NodeControllerBuilder) withKubeClient(ks kubernetes.Interface) *NodeControllerBuilder { - cb.NodeController.kubeclientset = ks - return cb -} - -// withOpenEBSClient fills openebs client to controller object. -func (cb *NodeControllerBuilder) withOpenEBSClient(cs clientset.Interface) *NodeControllerBuilder { - cb.NodeController.clientset = cs - return cb -} - -// withNodeLister fills Node lister to controller object. -func (cb *NodeControllerBuilder) withNodeLister(sl informers.SharedInformerFactory) *NodeControllerBuilder { - NodeInformer := sl.Local().V1alpha1().LVMNodes() - cb.NodeController.NodeLister = NodeInformer.Lister() - return cb -} - -// withNodeSynced adds object sync information in cache to controller object. -func (cb *NodeControllerBuilder) withNodeSynced(sl informers.SharedInformerFactory) *NodeControllerBuilder { - NodeInformer := sl.Local().V1alpha1().LVMNodes() - cb.NodeController.NodeSynced = NodeInformer.Informer().HasSynced - return cb -} - -// withWorkqueue adds workqueue to controller object. -func (cb *NodeControllerBuilder) withWorkqueueRateLimiting() *NodeControllerBuilder { - cb.NodeController.workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Node") - return cb -} - -// withRecorder adds recorder to controller object. -func (cb *NodeControllerBuilder) withRecorder(ks kubernetes.Interface) *NodeControllerBuilder { +func newNodeController(kubeClient kubernetes.Interface, client dynamic.Interface, + dynInformer dynamicinformer.DynamicSharedInformerFactory, ownerRef metav1.OwnerReference) *NodeController { + nodeInformer := dynInformer.ForResource(resource).Informer() klog.Infof("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ks.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - cb.NodeController.recorder = recorder - return cb -} - -// withEventHandler adds event handlers controller object. -func (cb *NodeControllerBuilder) withEventHandler(cvcInformerFactory informers.SharedInformerFactory) *NodeControllerBuilder { - cvcInformer := cvcInformerFactory.Local().V1alpha1().LVMNodes() - // Set up an event handler for when lvm node vg change. - // Note: rather than setting up the resync period at informer level, - // we are controlling the syncing based on pollInternal. See - // NodeController#Run func for more details. - cvcInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ - AddFunc: cb.NodeController.addNode, - UpdateFunc: cb.NodeController.updateNode, - DeleteFunc: cb.NodeController.deleteNode, - }, 0) - return cb -} - -func (cb *NodeControllerBuilder) withPollInterval(interval time.Duration) *NodeControllerBuilder { - cb.NodeController.pollInterval = interval - return cb -} - -func (cb *NodeControllerBuilder) withOwnerReference(ownerRef metav1.OwnerReference) *NodeControllerBuilder { - cb.NodeController.ownerRef = ownerRef - return cb -} - -// Build returns a controller instance. -func (cb *NodeControllerBuilder) Build() (*NodeController, error) { - err := openebsScheme.AddToScheme(scheme.Scheme) - if err != nil { - return nil, err + klog.Infof("Creating lvm node controller object") + nodeContrller := &NodeController{ + kubeclientset: kubeClient, + clientset: client, + NodeLister: dynamiclister.New(nodeInformer.GetIndexer(), resource), + NodeSynced: nodeInformer.HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Node"), + recorder: recorder, + pollInterval: 60 * time.Second, + ownerRef: ownerRef, } - return cb.NodeController, nil + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: nodeContrller.addNode, + UpdateFunc: nodeContrller.updateNode, + DeleteFunc: nodeContrller.deleteNode, + }) + return nodeContrller } diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index fc1d2a87..3099594d 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -18,6 +18,8 @@ package lvmnode import ( "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + runtimenew "k8s.io/apimachinery/pkg/runtime" "reflect" "time" @@ -54,13 +56,19 @@ func (c *NodeController) syncHandler(key string) error { // LVMNode func (c *NodeController) syncNode(namespace string, name string) error { // Get the node resource with this namespace/name - cachedNode, err := c.NodeLister.LVMNodes(namespace).Get(name) + cachedNode, err := c.NodeLister.Namespace(namespace).Get(name) if err != nil && !k8serror.IsNotFound(err) { return err } + + nodeStruct, ok := c.getStructuredObject(cachedNode) + if !ok { + return err + } + var node *apis.LVMNode if cachedNode != nil { - node = cachedNode.DeepCopy() + node = nodeStruct.DeepCopy() } vgs, err := c.listLVMVolumeGroup() @@ -116,9 +124,23 @@ func (c *NodeController) syncNode(namespace string, name string) error { return nil } +func (c *NodeController) getStructuredObject(obj interface{}) (*apis.LVMNode, bool) { + unstructuredInterface, ok := obj.(*unstructured.Unstructured) + if ok { + node := &apis.LVMNode{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &node) + if err != nil { + fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + return nil, false + } + return node, true + } + return nil, false +} + // addNode is the add event handler for LVMNode func (c *NodeController) addNode(obj interface{}) { - node, ok := obj.(*apis.LVMNode) + node, ok := c.getStructuredObject(obj) if !ok { runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", obj)) return @@ -130,7 +152,7 @@ func (c *NodeController) addNode(obj interface{}) { // updateNode is the update event handler for LVMNode func (c *NodeController) updateNode(oldObj, newObj interface{}) { - newNode, ok := newObj.(*apis.LVMNode) + newNode, ok := c.getStructuredObject(newObj) if !ok { runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", newNode)) return @@ -142,17 +164,21 @@ func (c *NodeController) updateNode(oldObj, newObj interface{}) { // deleteNode is the delete event handler for LVMNode func (c *NodeController) deleteNode(obj interface{}) { - node, ok := obj.(*apis.LVMNode) + node, ok := c.getStructuredObject(obj) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) - return - } - node, ok = tombstone.Obj.(*apis.LVMNode) - if !ok { - runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a LVMNode %#v", obj)) - return + unstructuredObj, ok := obj.(*unstructured.Unstructured) + if ok { + tombStone := cache.DeletedFinalStateUnknown{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + node, ok = tombStone.Obj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) + return + } } } diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go index 978bccfd..35ffd151 100644 --- a/pkg/mgmt/lvmnode/start.go +++ b/pkg/mgmt/lvmnode/start.go @@ -18,18 +18,19 @@ package lvmnode import ( "context" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "sync" "time" k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" - clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" - informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" "github.com/openebs/lvm-localpv/pkg/lvm" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" + "k8s.io/klog" ) // Start starts the lvmnode controller. @@ -47,28 +48,22 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { return errors.Wrap(err, "error building kubernetes clientset") } - // Building OpenEBS Clientset - openebsClient, err := clientset.NewForConfig(cfg) - if err != nil { - return errors.Wrap(err, "error building openebs clientset") - } - - /*openebsClientNew, err := dynamic.NewForConfig(cfg) + openebsClientNew, err := dynamic.NewForConfig(cfg) if err != nil { return errors.Wrap(err, "error building dynamic client for lvmnode cr") } - nodeInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(openebsClientNew, 5*time.Minute, - lvm.LvmNamespace, func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() - })*/ - // setup watch only on node we are interested in. - nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions( - openebsClient, 0, informers.WithNamespace(lvm.LvmNamespace), - informers.WithTweakListOptions(func(options *metav1.ListOptions) { + nodeInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(openebsClientNew, 5*time.Minute, + lvm.LvmNamespace, func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() - })) + }) + + /*nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions( + openebsClient, 0, informers.WithNamespace(lvm.LvmNamespace), + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() + }))*/ k8sNode, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), lvm.NodeID, metav1.GetOptions{}) if err != nil { @@ -94,16 +89,18 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { // This lock is used to serialize the AddToScheme call of all controllers. controllerMtx.Lock() - controller, err := NewNodeControllerBuilder(). - withKubeClient(kubeClient). - withOpenEBSClient(openebsClient). - withNodeSynced(nodeInformerFactory). - withNodeLister(nodeInformerFactory). - withRecorder(kubeClient). - withEventHandler(nodeInformerFactory). - withPollInterval(60 * time.Second). - withOwnerReference(ownerRef). - withWorkqueueRateLimiting().Build() + nodeCtrller := newNodeController(kubeClient, openebsClientNew, nodeInformerFactory, ownerRef) + + /*controller, err := NewNodeControllerBuilder(). + withKubeClient(kubeClient). + withOpenEBSClient(openebsClient). + withNodeSynced(nodeInformerFactory). + withNodeLister(nodeInformerFactory). + withRecorder(kubeClient). + withEventHandler(nodeInformerFactory). + withPollInterval(60 * time.Second). + withOwnerReference(ownerRef). + withWorkqueueRateLimiting().Build()*/ // blocking call, can't use defer to release the lock controllerMtx.Unlock() @@ -112,8 +109,10 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { return errors.Wrapf(err, "error building controller instance") } + klog.Info("Starting informer for lvm node controller") nodeInformerFactory.Start(stopCh) + klog.Info("Starting Lvm node controller") // Threadiness defines the number of workers to be launched in Run function - return controller.Run(1, stopCh) + return nodeCtrller.Run(1, stopCh) } diff --git a/pkg/mgmt/snapshot/builder.go b/pkg/mgmt/snapshot/builder.go index 6454a4b8..9db60804 100644 --- a/pkg/mgmt/snapshot/builder.go +++ b/pkg/mgmt/snapshot/builder.go @@ -17,11 +17,12 @@ limitations under the License. package snapshot import ( - clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" - openebsScheme "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset/scheme" - informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" - listers "github.com/openebs/lvm-localpv/pkg/generated/lister/lvm/v1alpha1" corev1 "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/dynamic/dynamiclister" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -33,15 +34,22 @@ import ( const controllerAgentName = "lvmsnap-controller" +var resource = schema.GroupVersionResource{ + Group: "local.openebs.io", + Version: "v1alpha1", + Resource: "lvmsnapshots", +} + // SnapController is the controller implementation for Snap resources type SnapController struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface - // clientset is a openebs custom resource package generated for custom API group. - clientset clientset.Interface + // clientset is a interface which will be used to list lvmsnapshot from Api server + clientset dynamic.Interface - snapLister listers.LVMSnapshotLister + //VolLister is used to list lvmsnapshot from informer cache + snapLister dynamiclister.Lister // snapSynced is used for caches sync to get populated snapSynced cache.InformerSynced @@ -58,78 +66,29 @@ type SnapController struct { recorder record.EventRecorder } -// SnapControllerBuilder is the builder object for controller. -type SnapControllerBuilder struct { - SnapController *SnapController -} - -// NewSnapControllerBuilder returns an empty instance of controller builder. -func NewSnapControllerBuilder() *SnapControllerBuilder { - return &SnapControllerBuilder{ - SnapController: &SnapController{}, - } -} - -// withKubeClient fills kube client to controller object. -func (cb *SnapControllerBuilder) withKubeClient(ks kubernetes.Interface) *SnapControllerBuilder { - cb.SnapController.kubeclientset = ks - return cb -} - -// withOpenEBSClient fills openebs client to controller object. -func (cb *SnapControllerBuilder) withOpenEBSClient(cs clientset.Interface) *SnapControllerBuilder { - cb.SnapController.clientset = cs - return cb -} - -// withSnapLister fills snap lister to controller object. -func (cb *SnapControllerBuilder) withSnapLister(sl informers.SharedInformerFactory) *SnapControllerBuilder { - snapInformer := sl.Local().V1alpha1().LVMSnapshots() - cb.SnapController.snapLister = snapInformer.Lister() - return cb -} - -// withSnapSynced adds object sync information in cache to controller object. -func (cb *SnapControllerBuilder) withSnapSynced(sl informers.SharedInformerFactory) *SnapControllerBuilder { - snapInformer := sl.Local().V1alpha1().LVMSnapshots() - cb.SnapController.snapSynced = snapInformer.Informer().HasSynced - return cb -} - -// withWorkqueue adds workqueue to controller object. -func (cb *SnapControllerBuilder) withWorkqueueRateLimiting() *SnapControllerBuilder { - cb.SnapController.workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Snap") - return cb -} - -// withRecorder adds recorder to controller object. -func (cb *SnapControllerBuilder) withRecorder(ks kubernetes.Interface) *SnapControllerBuilder { +func newSnapController(kubeClient kubernetes.Interface, client dynamic.Interface, + dynInformer dynamicinformer.DynamicSharedInformerFactory) *SnapController { + snapInformer := dynInformer.ForResource(resource).Informer() klog.Infof("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ks.CoreV1().Events("")}) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - cb.SnapController.recorder = recorder - return cb -} -// withEventHandler adds event handlers controller object. -func (cb *SnapControllerBuilder) withEventHandler(cvcInformerFactory informers.SharedInformerFactory) *SnapControllerBuilder { - cvcInformer := cvcInformerFactory.Local().V1alpha1().LVMSnapshots() - // Set up an event handler for when Snap resources change - cvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cb.SnapController.addSnap, - UpdateFunc: cb.SnapController.updateSnap, - DeleteFunc: cb.SnapController.deleteSnap, - }) - return cb -} - -// Build returns a controller instance. -func (cb *SnapControllerBuilder) Build() (*SnapController, error) { - err := openebsScheme.AddToScheme(scheme.Scheme) - if err != nil { - return nil, err + klog.Infof("Creating lvm snapshot controller object") + snapCtrller := &SnapController{ + kubeclientset: kubeClient, + clientset: client, + snapLister: dynamiclister.New(snapInformer.GetIndexer(), resource), + snapSynced: snapInformer.HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Snap"), + recorder: recorder, } - return cb.SnapController, nil + klog.Infof("Adding Event handler functions for lvm snapshot controller") + snapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: snapCtrller.addSnap, + DeleteFunc: snapCtrller.deleteSnap, + UpdateFunc: snapCtrller.updateSnap, + }) + return snapCtrller } diff --git a/pkg/mgmt/snapshot/snapshot.go b/pkg/mgmt/snapshot/snapshot.go index 3e5a20cb..de86dd2e 100644 --- a/pkg/mgmt/snapshot/snapshot.go +++ b/pkg/mgmt/snapshot/snapshot.go @@ -18,6 +18,8 @@ package snapshot import ( "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + runtimenew "k8s.io/apimachinery/pkg/runtime" "time" apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" @@ -45,7 +47,7 @@ func (c *SnapController) syncHandler(key string) error { } // Get the snap resource with this namespace/name - snap, err := c.snapLister.LVMSnapshots(namespace).Get(name) + unstructuredSnap, err := c.snapLister.Namespace(namespace).Get(name) if k8serror.IsNotFound(err) { runtime.HandleError(fmt.Errorf("lvm snapshot '%s' has been deleted", key)) return nil @@ -53,6 +55,12 @@ func (c *SnapController) syncHandler(key string) error { if err != nil { return err } + snap := apis.LVMSnapshot{} + err = runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredSnap.UnstructuredContent(), &snap) + //err = runtime.DefaultUnstructuredConverter.FromUnstructured(Vol.UnstructuredContent(), &vol) + if err != nil { + fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + } snapCopy := snap.DeepCopy() err = c.syncSnap(snapCopy) return err @@ -96,23 +104,23 @@ func (c *SnapController) syncSnap(snap *apis.LVMSnapshot) error { // addSnap is the add event handler for LVMSnapshot func (c *SnapController) addSnap(obj interface{}) { - snap, ok := obj.(*apis.LVMSnapshot) + snap, ok := c.getStructuredObject(obj) if !ok { - runtime.HandleError(fmt.Errorf("Couldn't get snap object %#v", obj)) + runtime.HandleError(fmt.Errorf("Couldn't get snaphot object %#v", obj)) return } if lvm.NodeID != snap.Spec.OwnerNodeID { return } - klog.Infof("Got add event for Snap %s/%s", snap.Spec.VolGroup, snap.Name) + klog.Infof("Got add event for Snapshot %s/%s", snap.Spec.VolGroup, snap.Name) + klog.Infof("lvmsnapshot object to be enqueued by Add handler: %v", snap) c.enqueueSnap(snap) } // updateSnap is the update event handler for LVMSnapshot func (c *SnapController) updateSnap(oldObj, newObj interface{}) { - - newSnap, ok := newObj.(*apis.LVMSnapshot) + newSnap, ok := c.getStructuredObject(newObj) if !ok { runtime.HandleError(fmt.Errorf("Couldn't get snap object %#v", newSnap)) return @@ -124,23 +132,23 @@ func (c *SnapController) updateSnap(oldObj, newObj interface{}) { // update on Snapshot CR does not make sense unless it is a deletion candidate if c.isDeletionCandidate(newSnap) { - klog.Infof("Got update event for Snap %s/%s@%s", newSnap.Spec.VolGroup, newSnap.Labels[lvm.LVMVolKey], newSnap.Name) + klog.Infof("Got update event for Snapshot %s/%s@%s", newSnap.Spec.VolGroup, newSnap.Labels[lvm.LVMVolKey], newSnap.Name) c.enqueueSnap(newSnap) } } // deleteSnap is the delete event handler for LVMSnapshot func (c *SnapController) deleteSnap(obj interface{}) { - snap, ok := obj.(*apis.LVMSnapshot) + snap, ok := c.getStructuredObject(obj) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } snap, ok = tombstone.Obj.(*apis.LVMSnapshot) if !ok { - runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a lvmsnap %#v", obj)) + runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmsnap %#v", obj)) return } } @@ -149,10 +157,24 @@ func (c *SnapController) deleteSnap(obj interface{}) { return } - klog.Infof("Got delete event for Snap %s/%s@%s", snap.Spec.VolGroup, snap.Labels[lvm.LVMVolKey], snap.Name) + klog.Infof("Got delete event for Snaphot %s/%s@%s", snap.Spec.VolGroup, snap.Labels[lvm.LVMVolKey], snap.Name) c.enqueueSnap(snap) } +func (c *SnapController) getStructuredObject(obj interface{}) (*apis.LVMSnapshot, bool) { + unstructuredInterface, ok := obj.(*unstructured.Unstructured) + if ok { + snap := &apis.LVMSnapshot{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &snap) + if err != nil { + fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + return nil, false + } + return snap, true + } + return nil, false +} + // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for diff --git a/pkg/mgmt/snapshot/start.go b/pkg/mgmt/snapshot/start.go index 28f38dd1..e91a3422 100644 --- a/pkg/mgmt/snapshot/start.go +++ b/pkg/mgmt/snapshot/start.go @@ -20,12 +20,10 @@ import ( "sync" "github.com/pkg/errors" - "time" - clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset" - informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" - kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -52,14 +50,12 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { return errors.Wrap(err, "error building kubernetes clientset") } - // Building OpenEBS Clientset - openebsClient, err := clientset.NewForConfig(cfg) + openebsClient, err := dynamic.NewForConfig(cfg) if err != nil { - return errors.Wrap(err, "error building openebs clientset") + return errors.Wrap(err, "error building dynamic client for lvmsnapshot cr") } - kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) - snapInformerFactory := informers.NewSharedInformerFactory(openebsClient, time.Second*30) + snapInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(openebsClient, 5*time.Minute) // Build() fn of all controllers calls AddToScheme to adds all types of this // clientset into the given scheme. // If multiple controllers happen to call this AddToScheme same time, @@ -67,25 +63,14 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { // This lock is used to serialize the AddToScheme call of all controllers. controllerMtx.Lock() - controller, err := NewSnapControllerBuilder(). - withKubeClient(kubeClient). - withOpenEBSClient(openebsClient). - withSnapSynced(snapInformerFactory). - withSnapLister(snapInformerFactory). - withRecorder(kubeClient). - withEventHandler(snapInformerFactory). - withWorkqueueRateLimiting().Build() + controller := newSnapController(kubeClient, openebsClient, snapInformerFactory) // blocking call, can't use defer to release the lock controllerMtx.Unlock() - - if err != nil { - return errors.Wrapf(err, "error building controller instance") - } - - go kubeInformerFactory.Start(stopCh) + + klog.Info("Starting informer for lvm snapshot controller") go snapInformerFactory.Start(stopCh) - + klog.Info("Starting Lvm snapshot controller") // Threadiness defines the number of workers to be launched in Run function return controller.Run(2, stopCh) } diff --git a/pkg/mgmt/volume/builder.go b/pkg/mgmt/volume/builder.go index 853d3d60..c95af9ca 100644 --- a/pkg/mgmt/volume/builder.go +++ b/pkg/mgmt/volume/builder.go @@ -44,11 +44,10 @@ type VolController struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface - // clientset is a openebs custom resource package generated for custom API group. - //clientset clientset.Interface + // clientset is a interface which will be used to list lvmvolumes from Api server clientset dynamic.Interface - //VolLister listers.LVMVolumeLister + //VolLister is used to list lvmvolumes from informer cache VolLister dynamiclister.Lister // VolSynced is used for caches sync to get populated @@ -68,8 +67,8 @@ type VolController struct { func newVolController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory) *VolController { - volInformer := dynInformer.ForResource(resource).Informer() + volInformer := dynInformer.ForResource(resource).Informer() klog.Infof("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) diff --git a/pkg/mgmt/volume/start.go b/pkg/mgmt/volume/start.go index 521ca8c8..245154aa 100644 --- a/pkg/mgmt/volume/start.go +++ b/pkg/mgmt/volume/start.go @@ -17,16 +17,12 @@ limitations under the License. package volume import ( - "sync" - "github.com/pkg/errors" - - "time" - - //informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" - //kubeinformers "k8s.io/client-go/informers" + "sync" + "time" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -53,7 +49,6 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { } // Building dynamic Client to watch lvmvolume cr - //openebsClient, err := clientset.NewForConfig(cfg) openebsClient, err := dynamic.NewForConfig(cfg) if err != nil { return errors.Wrap(err, "error building dynamic client for lvmvolume cr") diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 0deead11..e4fd23e8 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -73,8 +73,6 @@ func (c *VolController) syncHandler(key string) error { // addVol is the add event handler for LVMVolume func (c *VolController) addVol(obj interface{}) { - //Vol, ok := obj.(*apis.LVMVolume) - klog.Infoln("Add was called") Vol, ok := c.getStructuredObject(obj) if !ok { runtime.HandleError(fmt.Errorf("Couldn't get Vol object %#v", obj)) @@ -85,13 +83,12 @@ func (c *VolController) addVol(obj interface{}) { return } klog.Infof("Got add event for Vol %s", Vol.Name) + klog.Infof("lvmvolume object to be enqueued by Add handler: %v", Vol) c.enqueueVol(Vol) } // updateVol is the update event handler for LVMVolume func (c *VolController) updateVol(oldObj, newObj interface{}) { - klog.Infoln("Update was called") - //newVol, ok := newObj.(*apis.LVMVolume) newVol, ok := c.getStructuredObject(newObj) if !ok { runtime.HandleError(fmt.Errorf("Couldn't get Vol object %#v", newVol)) @@ -111,22 +108,19 @@ func (c *VolController) updateVol(oldObj, newObj interface{}) { // deleteVol is the delete event handler for LVMVolume func (c *VolController) deleteVol(obj interface{}) { - //Vol, ok := obj.(*apis.LVMVolume) - klog.Infoln("Delete was called") Vol, ok := c.getStructuredObject(obj) - klog.Infof("structured obj from delete event is vol: %v, ok: %v ", Vol, ok) if !ok { unstructuredObj, ok := obj.(*unstructured.Unstructured) if ok { tombStone := cache.DeletedFinalStateUnknown{} err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) if err != nil { - runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } Vol, ok = tombStone.Obj.(*apis.LVMVolume) if !ok { - runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a lvmvolume %#v", obj)) + runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) return } } @@ -158,10 +152,9 @@ func (c *VolController) getStructuredObject(obj interface{}) (*apis.LVMVolume, b vol := &apis.LVMVolume{} err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &vol) if err != nil { - fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + klog.Infof("err %s, While converting unstructured obj to typed object\n", err.Error()) return nil, false } - fmt.Println("Object from Informer cache: ", vol) return vol, true } return nil, false From 840057502849a2042f9fe29ad9bd779197851af1 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Thu, 28 Apr 2022 18:43:11 +0530 Subject: [PATCH 04/13] reverting changes on buildx file Signed-off-by: Abhilash Shetty --- Makefile.buildx.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile.buildx.mk b/Makefile.buildx.mk index da99637a..d33ce6d3 100644 --- a/Makefile.buildx.mk +++ b/Makefile.buildx.mk @@ -23,7 +23,7 @@ endif # default list of platforms for which multiarch image is built ifeq (${PLATFORMS}, ) - export PLATFORMS="linux/amd64" + export PLATFORMS="linux/amd64,linux/arm64" endif # if IMG_RESULT is unspecified, by default the image will be pushed to registry From 7252e26312df8b09a6847b2bb8ac0a494088f27e Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Fri, 29 Apr 2022 19:02:19 +0530 Subject: [PATCH 05/13] Fixed :invalid memory address or nil pointer dereference Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/lvmnode.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index 3099594d..ebb81a14 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -61,13 +61,12 @@ func (c *NodeController) syncNode(namespace string, name string) error { return err } - nodeStruct, ok := c.getStructuredObject(cachedNode) - if !ok { - return err - } - var node *apis.LVMNode if cachedNode != nil { + nodeStruct, ok := c.getStructuredObject(cachedNode) + if !ok { + return err + } node = nodeStruct.DeepCopy() } From 6819f6177d33fb898b826569494dc2b3ecdc9574 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Sat, 14 May 2022 22:07:56 +0530 Subject: [PATCH 06/13] Made changes to order of the import statements and changed fmt usage to klog Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/start.go | 9 ++++----- pkg/mgmt/snapshot/builder.go | 1 - pkg/mgmt/snapshot/snapshot.go | 6 +++--- pkg/mgmt/snapshot/start.go | 8 +++----- pkg/mgmt/volume/start.go | 6 +++--- pkg/mgmt/volume/volume.go | 4 ++-- 6 files changed, 15 insertions(+), 19 deletions(-) diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go index 35ffd151..260060a2 100644 --- a/pkg/mgmt/lvmnode/start.go +++ b/pkg/mgmt/lvmnode/start.go @@ -18,19 +18,18 @@ package lvmnode import ( "context" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - "sync" - "time" - k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" "github.com/openebs/lvm-localpv/pkg/lvm" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/klog" + "sync" + "time" ) // Start starts the lvmnode controller. diff --git a/pkg/mgmt/snapshot/builder.go b/pkg/mgmt/snapshot/builder.go index 9db60804..6102274b 100644 --- a/pkg/mgmt/snapshot/builder.go +++ b/pkg/mgmt/snapshot/builder.go @@ -18,7 +18,6 @@ package snapshot import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" diff --git a/pkg/mgmt/snapshot/snapshot.go b/pkg/mgmt/snapshot/snapshot.go index de86dd2e..06fc26ca 100644 --- a/pkg/mgmt/snapshot/snapshot.go +++ b/pkg/mgmt/snapshot/snapshot.go @@ -18,17 +18,17 @@ package snapshot import ( "fmt" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - runtimenew "k8s.io/apimachinery/pkg/runtime" - "time" apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" lvm "github.com/openebs/lvm-localpv/pkg/lvm" k8serror "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + runtimenew "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/klog" + "time" ) // isDeletionCandidate checks if a lvm snapshot is a deletion candidate. diff --git a/pkg/mgmt/snapshot/start.go b/pkg/mgmt/snapshot/start.go index e91a3422..26478cb8 100644 --- a/pkg/mgmt/snapshot/start.go +++ b/pkg/mgmt/snapshot/start.go @@ -17,17 +17,15 @@ limitations under the License. package snapshot import ( - "sync" - "github.com/pkg/errors" - "time" - "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" + "sync" + "time" ) var ( @@ -67,7 +65,7 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { // blocking call, can't use defer to release the lock controllerMtx.Unlock() - + klog.Info("Starting informer for lvm snapshot controller") go snapInformerFactory.Start(stopCh) klog.Info("Starting Lvm snapshot controller") diff --git a/pkg/mgmt/volume/start.go b/pkg/mgmt/volume/start.go index 245154aa..080a8d47 100644 --- a/pkg/mgmt/volume/start.go +++ b/pkg/mgmt/volume/start.go @@ -17,12 +17,12 @@ limitations under the License. package volume import ( - "github.com/pkg/errors" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" "sync" "time" + "github.com/pkg/errors" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index e4fd23e8..13bba147 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -18,7 +18,6 @@ package volume import ( "fmt" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "regexp" "sort" "strconv" @@ -28,6 +27,7 @@ import ( apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" "github.com/openebs/lvm-localpv/pkg/lvm" k8serror "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" runtimenew "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -64,7 +64,7 @@ func (c *VolController) syncHandler(key string) error { err = runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredVol.UnstructuredContent(), &vol) //err = runtime.DefaultUnstructuredConverter.FromUnstructured(Vol.UnstructuredContent(), &vol) if err != nil { - fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + klog.Infof("err %s, While converting unstructured obj to typed object\n", err.Error()) } VolCopy := vol.DeepCopy() err = c.syncVol(VolCopy) From 6444d2bea2749275f9d51d59a3914981382ea509 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Tue, 24 May 2022 21:56:06 +0530 Subject: [PATCH 07/13] Incorporated review comments Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/builder.go | 12 +++++++++--- pkg/mgmt/lvmnode/lvmnode.go | 6 +++++- pkg/mgmt/lvmnode/start.go | 17 ----------------- pkg/mgmt/snapshot/builder.go | 12 +++++++++--- pkg/mgmt/snapshot/snapshot.go | 26 ++++++++++++++++---------- pkg/mgmt/volume/builder.go | 12 +++++++++--- pkg/mgmt/volume/volume.go | 8 +++++--- 7 files changed, 53 insertions(+), 40 deletions(-) diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go index a362120a..f07d4f68 100644 --- a/pkg/mgmt/lvmnode/builder.go +++ b/pkg/mgmt/lvmnode/builder.go @@ -36,10 +36,16 @@ import ( const controllerAgentName = "lvmnode-controller" +const ( + GroupOpenebsIO = "local.openebs.io" + VersionV1alpha1 = "v1alpha1" + Resource = "lvmnodes" +) + var resource = schema.GroupVersionResource{ - Group: "local.openebs.io", - Version: "v1alpha1", - Resource: "lvmnodes", + Group: GroupOpenebsIO, + Version: VersionV1alpha1, + Resource: Resource, } // NodeController is the controller implementation for lvm node resources diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index ebb81a14..d8588b27 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -65,7 +65,7 @@ func (c *NodeController) syncNode(namespace string, name string) error { if cachedNode != nil { nodeStruct, ok := c.getStructuredObject(cachedNode) if !ok { - return err + return fmt.Errorf("couldn't get node object %#v", cachedNode) } node = nodeStruct.DeepCopy() } @@ -134,6 +134,7 @@ func (c *NodeController) getStructuredObject(obj interface{}) (*apis.LVMNode, bo } return node, true } + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return nil, false } @@ -178,6 +179,9 @@ func (c *NodeController) deleteNode(obj interface{}) { runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) return } + } else { + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) + return } } diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go index 260060a2..67a7fd1b 100644 --- a/pkg/mgmt/lvmnode/start.go +++ b/pkg/mgmt/lvmnode/start.go @@ -58,12 +58,6 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() }) - /*nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions( - openebsClient, 0, informers.WithNamespace(lvm.LvmNamespace), - informers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("metadata.name", lvm.NodeID).String() - }))*/ - k8sNode, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), lvm.NodeID, metav1.GetOptions{}) if err != nil { return errors.Wrapf(err, "fetch k8s node %s", lvm.NodeID) @@ -90,17 +84,6 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { nodeCtrller := newNodeController(kubeClient, openebsClientNew, nodeInformerFactory, ownerRef) - /*controller, err := NewNodeControllerBuilder(). - withKubeClient(kubeClient). - withOpenEBSClient(openebsClient). - withNodeSynced(nodeInformerFactory). - withNodeLister(nodeInformerFactory). - withRecorder(kubeClient). - withEventHandler(nodeInformerFactory). - withPollInterval(60 * time.Second). - withOwnerReference(ownerRef). - withWorkqueueRateLimiting().Build()*/ - // blocking call, can't use defer to release the lock controllerMtx.Unlock() diff --git a/pkg/mgmt/snapshot/builder.go b/pkg/mgmt/snapshot/builder.go index 6102274b..7a2b2bff 100644 --- a/pkg/mgmt/snapshot/builder.go +++ b/pkg/mgmt/snapshot/builder.go @@ -33,10 +33,16 @@ import ( const controllerAgentName = "lvmsnap-controller" +const ( + GroupOpenebsIO = "local.openebs.io" + VersionV1alpha1 = "v1alpha1" + Resource = "lvmsnapshots" +) + var resource = schema.GroupVersionResource{ - Group: "local.openebs.io", - Version: "v1alpha1", - Resource: "lvmsnapshots", + Group: GroupOpenebsIO, + Version: VersionV1alpha1, + Resource: Resource, } // SnapController is the controller implementation for Snap resources diff --git a/pkg/mgmt/snapshot/snapshot.go b/pkg/mgmt/snapshot/snapshot.go index 06fc26ca..8a4785de 100644 --- a/pkg/mgmt/snapshot/snapshot.go +++ b/pkg/mgmt/snapshot/snapshot.go @@ -57,7 +57,6 @@ func (c *SnapController) syncHandler(key string) error { } snap := apis.LVMSnapshot{} err = runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredSnap.UnstructuredContent(), &snap) - //err = runtime.DefaultUnstructuredConverter.FromUnstructured(Vol.UnstructuredContent(), &vol) if err != nil { fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) } @@ -114,7 +113,6 @@ func (c *SnapController) addSnap(obj interface{}) { return } klog.Infof("Got add event for Snapshot %s/%s", snap.Spec.VolGroup, snap.Name) - klog.Infof("lvmsnapshot object to be enqueued by Add handler: %v", snap) c.enqueueSnap(snap) } @@ -141,14 +139,21 @@ func (c *SnapController) updateSnap(oldObj, newObj interface{}) { func (c *SnapController) deleteSnap(obj interface{}) { snap, ok := c.getStructuredObject(obj) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) - return - } - snap, ok = tombstone.Obj.(*apis.LVMSnapshot) - if !ok { - runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmsnap %#v", obj)) + unstructuredObj, ok := obj.(*unstructured.Unstructured) + if ok { + tombStone := cache.DeletedFinalStateUnknown{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + snap, ok = tombStone.Obj.(*apis.LVMSnapshot) + if !ok { + runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) + return + } + } else { + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return } } @@ -172,6 +177,7 @@ func (c *SnapController) getStructuredObject(obj interface{}) (*apis.LVMSnapshot } return snap, true } + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return nil, false } diff --git a/pkg/mgmt/volume/builder.go b/pkg/mgmt/volume/builder.go index c95af9ca..f08c1f3e 100644 --- a/pkg/mgmt/volume/builder.go +++ b/pkg/mgmt/volume/builder.go @@ -33,10 +33,16 @@ import ( const controllerAgentName = "lvmvolume-controller" +const ( + GroupOpenebsIO = "local.openebs.io" + VersionV1alpha1 = "v1alpha1" + Resource = "lvmvolumes" +) + var resource = schema.GroupVersionResource{ - Group: "local.openebs.io", - Version: "v1alpha1", - Resource: "lvmvolumes", + Group: GroupOpenebsIO, + Version: VersionV1alpha1, + Resource: Resource, } // VolController is the controller implementation for volume resources diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 13bba147..00714f41 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -62,7 +62,6 @@ func (c *VolController) syncHandler(key string) error { } vol := &apis.LVMVolume{} err = runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredVol.UnstructuredContent(), &vol) - //err = runtime.DefaultUnstructuredConverter.FromUnstructured(Vol.UnstructuredContent(), &vol) if err != nil { klog.Infof("err %s, While converting unstructured obj to typed object\n", err.Error()) } @@ -100,8 +99,7 @@ func (c *VolController) updateVol(oldObj, newObj interface{}) { } if c.isDeletionCandidate(newVol) { - klog.Infof("Got update event for deleted Vol %s", newVol.Name) - klog.Infof("Deletion timestamp for the volume from UpdateVOl: %v", newVol.ObjectMeta.DeletionTimestamp) + klog.Infof("Got update event for deleted Vol %s, Deletion timestamp ", newVol.Name, newVol.ObjectMeta.DeletionTimestamp) c.enqueueVol(newVol) } } @@ -123,6 +121,9 @@ func (c *VolController) deleteVol(obj interface{}) { runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) return } + } else { + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) + return } } @@ -157,6 +158,7 @@ func (c *VolController) getStructuredObject(obj interface{}) (*apis.LVMVolume, b } return vol, true } + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return nil, false } From fa51b8bca3bc1844ad992fc4e69375434697da52 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Tue, 24 May 2022 23:28:41 +0530 Subject: [PATCH 08/13] Syntax fix Signed-off-by: Abhilash Shetty --- pkg/mgmt/volume/volume.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 00714f41..1cfd5ef9 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -99,7 +99,7 @@ func (c *VolController) updateVol(oldObj, newObj interface{}) { } if c.isDeletionCandidate(newVol) { - klog.Infof("Got update event for deleted Vol %s, Deletion timestamp ", newVol.Name, newVol.ObjectMeta.DeletionTimestamp) + klog.Infof("Got update event for deleted Vol %s, Deletion timestamp %s", newVol.Name, newVol.ObjectMeta.DeletionTimestamp) c.enqueueVol(newVol) } } From ad75ec22c549890339a9b4cfbc6526863c797465 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Thu, 26 May 2022 15:30:45 +0530 Subject: [PATCH 09/13] Added descriptions for newly added functions Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/builder.go | 8 ++++---- pkg/mgmt/lvmnode/lvmnode.go | 1 + pkg/mgmt/snapshot/builder.go | 1 + pkg/mgmt/snapshot/snapshot.go | 1 + pkg/mgmt/volume/builder.go | 1 + pkg/mgmt/volume/volume.go | 1 + 6 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go index f07d4f68..67499f45 100644 --- a/pkg/mgmt/lvmnode/builder.go +++ b/pkg/mgmt/lvmnode/builder.go @@ -17,14 +17,12 @@ package lvmnode import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamiclister" - "time" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -32,6 +30,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" + "time" ) const controllerAgentName = "lvmnode-controller" @@ -81,6 +80,7 @@ type NodeController struct { ownerRef metav1.OwnerReference } +//This function returns controller object with all required keys set to watch over lvmnode object func newNodeController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory, ownerRef metav1.OwnerReference) *NodeController { nodeInformer := dynInformer.ForResource(resource).Informer() diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index d8588b27..92d816ae 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -123,6 +123,7 @@ func (c *NodeController) syncNode(namespace string, name string) error { return nil } +//Obj from queue is not readily in lvmnode type. This function would convert obj into lvmnode type. func (c *NodeController) getStructuredObject(obj interface{}) (*apis.LVMNode, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) if ok { diff --git a/pkg/mgmt/snapshot/builder.go b/pkg/mgmt/snapshot/builder.go index 7a2b2bff..79830187 100644 --- a/pkg/mgmt/snapshot/builder.go +++ b/pkg/mgmt/snapshot/builder.go @@ -71,6 +71,7 @@ type SnapController struct { recorder record.EventRecorder } +//This function returns controller object with all required keys set to watch over lvmsnapshot object func newSnapController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory) *SnapController { snapInformer := dynInformer.ForResource(resource).Informer() diff --git a/pkg/mgmt/snapshot/snapshot.go b/pkg/mgmt/snapshot/snapshot.go index 8a4785de..ba88e664 100644 --- a/pkg/mgmt/snapshot/snapshot.go +++ b/pkg/mgmt/snapshot/snapshot.go @@ -166,6 +166,7 @@ func (c *SnapController) deleteSnap(obj interface{}) { c.enqueueSnap(snap) } +//Obj from queue is not readily in lvmsnapshot type. This function would convert obj into lvmsnapshot type. func (c *SnapController) getStructuredObject(obj interface{}) (*apis.LVMSnapshot, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) if ok { diff --git a/pkg/mgmt/volume/builder.go b/pkg/mgmt/volume/builder.go index f08c1f3e..6439ab5f 100644 --- a/pkg/mgmt/volume/builder.go +++ b/pkg/mgmt/volume/builder.go @@ -71,6 +71,7 @@ type VolController struct { recorder record.EventRecorder } +//This function returns controller object with all required keys set to watch over lvmvolume object func newVolController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory) *VolController { diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 1cfd5ef9..7446eb36 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -147,6 +147,7 @@ func (c *VolController) enqueueVol(obj interface{}) { c.workqueue.Add(key) } +//Obj from queue is not readily in lvmvol type. This function would convert obj into lvmvolume type. func (c *VolController) getStructuredObject(obj interface{}) (*apis.LVMVolume, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) if ok { From 946fb579e79b7b99acc3fa897588dbc21cd755a2 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Mon, 30 May 2022 13:37:24 +0530 Subject: [PATCH 10/13] Changes Incorporating review comments Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/builder.go | 9 +++--- pkg/mgmt/lvmnode/lvmnode.go | 54 +++++++++++++++++------------------ pkg/mgmt/lvmnode/start.go | 4 +-- pkg/mgmt/snapshot/builder.go | 12 ++++---- pkg/mgmt/snapshot/snapshot.go | 51 +++++++++++++++++---------------- pkg/mgmt/volume/builder.go | 9 +++--- pkg/mgmt/volume/volume.go | 46 ++++++++++++++--------------- 7 files changed, 92 insertions(+), 93 deletions(-) diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go index 67499f45..9f4725a4 100644 --- a/pkg/mgmt/lvmnode/builder.go +++ b/pkg/mgmt/lvmnode/builder.go @@ -33,12 +33,11 @@ import ( "time" ) -const controllerAgentName = "lvmnode-controller" - const ( - GroupOpenebsIO = "local.openebs.io" - VersionV1alpha1 = "v1alpha1" - Resource = "lvmnodes" + controllerAgentName = "lvmnode-controller" + GroupOpenebsIO = "local.openebs.io" + VersionV1alpha1 = "v1alpha1" + Resource = "lvmnodes" ) var resource = schema.GroupVersionResource{ diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index 92d816ae..ca77b2d4 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -18,6 +18,7 @@ package lvmnode import ( "fmt" + "github.com/openebs/lib-csi/pkg/common/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" runtimenew "k8s.io/apimachinery/pkg/runtime" "reflect" @@ -45,7 +46,7 @@ func (c *NodeController) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + runtime.HandleError(errors.Errorf("invalid resource key: %s", key)) return nil } @@ -65,7 +66,7 @@ func (c *NodeController) syncNode(namespace string, name string) error { if cachedNode != nil { nodeStruct, ok := c.getStructuredObject(cachedNode) if !ok { - return fmt.Errorf("couldn't get node object %#v", cachedNode) + return errors.Errorf("couldn't get node object %#v", cachedNode) } node = nodeStruct.DeepCopy() } @@ -86,7 +87,7 @@ func (c *NodeController) syncNode(namespace string, name string) error { klog.Infof("lvm node controller: creating new node object for %+v", node) if _, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Create(node); err != nil { - return fmt.Errorf("create lvm node %s/%s: %v", namespace, name, err) + return errors.Errorf("create lvm node %s/%s: %v", namespace, name, err) } klog.Infof("lvm node controller: created node object %s/%s", namespace, name) return nil @@ -116,7 +117,7 @@ func (c *NodeController) syncNode(namespace string, name string) error { klog.Infof("lvm node controller: updating node object with %+v", node) if _, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Update(node); err != nil { - return fmt.Errorf("update lvm node %s/%s: %v", namespace, name, err) + return errors.Errorf("update lvm node %s/%s: %v", namespace, name, err) } klog.Infof("lvm node controller: updated node object %s/%s", namespace, name) @@ -126,17 +127,17 @@ func (c *NodeController) syncNode(namespace string, name string) error { //Obj from queue is not readily in lvmnode type. This function would convert obj into lvmnode type. func (c *NodeController) getStructuredObject(obj interface{}) (*apis.LVMNode, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) - if ok { - node := &apis.LVMNode{} - err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &node) - if err != nil { - fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) - return nil, false - } - return node, true + if !ok { + runtime.HandleError(errors.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) + return nil, false } - runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) - return nil, false + node := &apis.LVMNode{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &node) + if err != nil { + fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + return nil, false + } + return node, true } // addNode is the add event handler for LVMNode @@ -168,22 +169,21 @@ func (c *NodeController) deleteNode(obj interface{}) { node, ok := c.getStructuredObject(obj) if !ok { unstructuredObj, ok := obj.(*unstructured.Unstructured) - if ok { - tombStone := cache.DeletedFinalStateUnknown{} - err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) - if err != nil { - runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) - return - } - node, ok = tombStone.Obj.(*apis.LVMNode) - if !ok { - runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) - return - } - } else { + if !ok { runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return } + tombStone := cache.DeletedFinalStateUnknown{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + node, ok = tombStone.Obj.(*apis.LVMNode) + if !ok { + runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmnode %#v", obj)) + return + } } klog.Infof("Got delete event for node %s/%s", node.Namespace, node.Name) diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go index 67a7fd1b..b0b19126 100644 --- a/pkg/mgmt/lvmnode/start.go +++ b/pkg/mgmt/lvmnode/start.go @@ -82,7 +82,7 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { // This lock is used to serialize the AddToScheme call of all controllers. controllerMtx.Lock() - nodeCtrller := newNodeController(kubeClient, openebsClientNew, nodeInformerFactory, ownerRef) + controller := newNodeController(kubeClient, openebsClientNew, nodeInformerFactory, ownerRef) // blocking call, can't use defer to release the lock controllerMtx.Unlock() @@ -96,5 +96,5 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { klog.Info("Starting Lvm node controller") // Threadiness defines the number of workers to be launched in Run function - return nodeCtrller.Run(1, stopCh) + return controller.Run(1, stopCh) } diff --git a/pkg/mgmt/snapshot/builder.go b/pkg/mgmt/snapshot/builder.go index 79830187..4e81af8d 100644 --- a/pkg/mgmt/snapshot/builder.go +++ b/pkg/mgmt/snapshot/builder.go @@ -31,12 +31,11 @@ import ( "k8s.io/klog" ) -const controllerAgentName = "lvmsnap-controller" - const ( - GroupOpenebsIO = "local.openebs.io" - VersionV1alpha1 = "v1alpha1" - Resource = "lvmsnapshots" + controllerAgentName = "lvmsnap-controller" + GroupOpenebsIO = "local.openebs.io" + VersionV1alpha1 = "v1alpha1" + Resource = "lvmsnapshots" ) var resource = schema.GroupVersionResource{ @@ -74,13 +73,14 @@ type SnapController struct { //This function returns controller object with all required keys set to watch over lvmsnapshot object func newSnapController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory) *SnapController { + //Creating informer for lvmsnapshot resource snapInformer := dynInformer.ForResource(resource).Informer() + klog.Infof("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - klog.Infof("Creating lvm snapshot controller object") snapCtrller := &SnapController{ kubeclientset: kubeClient, diff --git a/pkg/mgmt/snapshot/snapshot.go b/pkg/mgmt/snapshot/snapshot.go index ba88e664..8bd37f43 100644 --- a/pkg/mgmt/snapshot/snapshot.go +++ b/pkg/mgmt/snapshot/snapshot.go @@ -18,6 +18,7 @@ package snapshot import ( "fmt" + "github.com/pkg/errors" apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" lvm "github.com/openebs/lvm-localpv/pkg/lvm" @@ -140,22 +141,21 @@ func (c *SnapController) deleteSnap(obj interface{}) { snap, ok := c.getStructuredObject(obj) if !ok { unstructuredObj, ok := obj.(*unstructured.Unstructured) - if ok { - tombStone := cache.DeletedFinalStateUnknown{} - err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) - if err != nil { - runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) - return - } - snap, ok = tombStone.Obj.(*apis.LVMSnapshot) - if !ok { - runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) - return - } - } else { + if !ok { runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return } + tombStone := cache.DeletedFinalStateUnknown{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + snap, ok = tombStone.Obj.(*apis.LVMSnapshot) + if !ok { + runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmsnapshot %#v", obj)) + return + } } if lvm.NodeID != snap.Spec.OwnerNodeID { @@ -169,17 +169,18 @@ func (c *SnapController) deleteSnap(obj interface{}) { //Obj from queue is not readily in lvmsnapshot type. This function would convert obj into lvmsnapshot type. func (c *SnapController) getStructuredObject(obj interface{}) (*apis.LVMSnapshot, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) - if ok { - snap := &apis.LVMSnapshot{} - err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &snap) - if err != nil { - fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) - return nil, false - } - return snap, true + if !ok { + runtime.HandleError(errors.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) + return nil, false } - runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) - return nil, false + snap := &apis.LVMSnapshot{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &snap) + if err != nil { + fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + return nil, false + } + return snap, true + } // Run will set up the event handlers for types we are interested in, as well @@ -250,7 +251,7 @@ func (c *SnapController) processNextWorkItem() bool { // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + runtime.HandleError(errors.Errorf("expected string in workqueue but got %#v", obj)) return nil } // Run the syncHandler, passing it the namespace/name string of the @@ -258,7 +259,7 @@ func (c *SnapController) processNextWorkItem() bool { if err := c.syncHandler(key); err != nil { // Put the item back on the workqueue to handle any transient errors. c.workqueue.AddRateLimited(key) - return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + return errors.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. diff --git a/pkg/mgmt/volume/builder.go b/pkg/mgmt/volume/builder.go index 6439ab5f..04b635eb 100644 --- a/pkg/mgmt/volume/builder.go +++ b/pkg/mgmt/volume/builder.go @@ -31,12 +31,11 @@ import ( "k8s.io/klog" ) -const controllerAgentName = "lvmvolume-controller" - const ( - GroupOpenebsIO = "local.openebs.io" - VersionV1alpha1 = "v1alpha1" - Resource = "lvmvolumes" + controllerAgentName = "lvmvolume-controller" + GroupOpenebsIO = "local.openebs.io" + VersionV1alpha1 = "v1alpha1" + Resource = "lvmvolumes" ) var resource = schema.GroupVersionResource{ diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 7446eb36..11cb5ee8 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -18,6 +18,7 @@ package volume import ( "fmt" + "github.com/pkg/errors" "regexp" "sort" "strconv" @@ -109,22 +110,21 @@ func (c *VolController) deleteVol(obj interface{}) { Vol, ok := c.getStructuredObject(obj) if !ok { unstructuredObj, ok := obj.(*unstructured.Unstructured) - if ok { - tombStone := cache.DeletedFinalStateUnknown{} - err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) - if err != nil { - runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) - return - } - Vol, ok = tombStone.Obj.(*apis.LVMVolume) - if !ok { - runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) - return - } - } else { + if !ok { runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return } + tombStone := cache.DeletedFinalStateUnknown{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &tombStone) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + Vol, ok = tombStone.Obj.(*apis.LVMVolume) + if !ok { + runtime.HandleError(fmt.Errorf("tombstone contained object that is not a lvmvolume %#v", obj)) + return + } } if lvm.NodeID != Vol.Spec.OwnerNodeID { @@ -150,17 +150,17 @@ func (c *VolController) enqueueVol(obj interface{}) { //Obj from queue is not readily in lvmvol type. This function would convert obj into lvmvolume type. func (c *VolController) getStructuredObject(obj interface{}) (*apis.LVMVolume, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) - if ok { - vol := &apis.LVMVolume{} - err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &vol) - if err != nil { - klog.Infof("err %s, While converting unstructured obj to typed object\n", err.Error()) - return nil, false - } - return vol, true + if !ok { + runtime.HandleError(errors.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) + return nil, false + } + vol := &apis.LVMVolume{} + err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &vol) + if err != nil { + klog.Infof("err %s, While converting unstructured obj to typed object\n", err.Error()) + return nil, false } - runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) - return nil, false + return vol, true } // synVol is the function which tries to converge to a desired state for the From 285c4fdf884fe8e22d8bd5ba7d575dc84aa21916 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Wed, 1 Jun 2022 12:38:11 +0530 Subject: [PATCH 11/13] Incorporating review comments Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/builder.go | 6 +++--- pkg/mgmt/lvmnode/lvmnode.go | 2 +- pkg/mgmt/snapshot/builder.go | 15 +++++++++------ pkg/mgmt/snapshot/snapshot.go | 12 +++++------- pkg/mgmt/volume/builder.go | 6 +++--- pkg/mgmt/volume/volume.go | 5 ++--- 6 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go index 9f4725a4..aec8edee 100644 --- a/pkg/mgmt/lvmnode/builder.go +++ b/pkg/mgmt/lvmnode/builder.go @@ -40,7 +40,7 @@ const ( Resource = "lvmnodes" ) -var resource = schema.GroupVersionResource{ +var noderesource = schema.GroupVersionResource{ Group: GroupOpenebsIO, Version: VersionV1alpha1, Resource: Resource, @@ -82,7 +82,7 @@ type NodeController struct { //This function returns controller object with all required keys set to watch over lvmnode object func newNodeController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory, ownerRef metav1.OwnerReference) *NodeController { - nodeInformer := dynInformer.ForResource(resource).Informer() + nodeInformer := dynInformer.ForResource(noderesource).Informer() klog.Infof("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) @@ -92,7 +92,7 @@ func newNodeController(kubeClient kubernetes.Interface, client dynamic.Interface nodeContrller := &NodeController{ kubeclientset: kubeClient, clientset: client, - NodeLister: dynamiclister.New(nodeInformer.GetIndexer(), resource), + NodeLister: dynamiclister.New(nodeInformer.GetIndexer(), noderesource), NodeSynced: nodeInformer.HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Node"), recorder: recorder, diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index ca77b2d4..7e685e56 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -134,7 +134,7 @@ func (c *NodeController) getStructuredObject(obj interface{}) (*apis.LVMNode, bo node := &apis.LVMNode{} err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &node) if err != nil { - fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + runtime.HandleError(fmt.Errorf("err %s, While converting unstructured obj to typed object\n", err.Error())) return nil, false } return node, true diff --git a/pkg/mgmt/snapshot/builder.go b/pkg/mgmt/snapshot/builder.go index 4e81af8d..cf857b78 100644 --- a/pkg/mgmt/snapshot/builder.go +++ b/pkg/mgmt/snapshot/builder.go @@ -29,6 +29,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" + "time" ) const ( @@ -38,7 +39,7 @@ const ( Resource = "lvmsnapshots" ) -var resource = schema.GroupVersionResource{ +var snapresource = schema.GroupVersionResource{ Group: GroupOpenebsIO, Version: VersionV1alpha1, Resource: Resource, @@ -74,8 +75,9 @@ type SnapController struct { func newSnapController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory) *SnapController { //Creating informer for lvmsnapshot resource - snapInformer := dynInformer.ForResource(resource).Informer() - + snapInformer := dynInformer.ForResource(snapresource).Informer() + klog.Infoln("Using new rate limiter") + rateLimiter := workqueue.NewItemFastSlowRateLimiter(5*time.Second, 30*time.Second, 12) klog.Infof("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) @@ -85,10 +87,11 @@ func newSnapController(kubeClient kubernetes.Interface, client dynamic.Interface snapCtrller := &SnapController{ kubeclientset: kubeClient, clientset: client, - snapLister: dynamiclister.New(snapInformer.GetIndexer(), resource), + snapLister: dynamiclister.New(snapInformer.GetIndexer(), snapresource), snapSynced: snapInformer.HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Snap"), - recorder: recorder, + //workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Snap"), + workqueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "Snap"), + recorder: recorder, } klog.Infof("Adding Event handler functions for lvm snapshot controller") snapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/mgmt/snapshot/snapshot.go b/pkg/mgmt/snapshot/snapshot.go index 8bd37f43..cf6d2b0a 100644 --- a/pkg/mgmt/snapshot/snapshot.go +++ b/pkg/mgmt/snapshot/snapshot.go @@ -18,8 +18,6 @@ package snapshot import ( "fmt" - "github.com/pkg/errors" - apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" lvm "github.com/openebs/lvm-localpv/pkg/lvm" k8serror "k8s.io/apimachinery/pkg/api/errors" @@ -59,7 +57,7 @@ func (c *SnapController) syncHandler(key string) error { snap := apis.LVMSnapshot{} err = runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredSnap.UnstructuredContent(), &snap) if err != nil { - fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + klog.Infof("err %s, While converting unstructured obj to typed object\n", err.Error()) } snapCopy := snap.DeepCopy() err = c.syncSnap(snapCopy) @@ -170,13 +168,13 @@ func (c *SnapController) deleteSnap(obj interface{}) { func (c *SnapController) getStructuredObject(obj interface{}) (*apis.LVMSnapshot, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) if !ok { - runtime.HandleError(errors.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return nil, false } snap := &apis.LVMSnapshot{} err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &snap) if err != nil { - fmt.Printf("err %s, While converting unstructured obj to typed object\n", err.Error()) + runtime.HandleError(fmt.Errorf("err %s, While converting unstructured obj to typed object\n", err.Error())) return nil, false } return snap, true @@ -251,7 +249,7 @@ func (c *SnapController) processNextWorkItem() bool { // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) - runtime.HandleError(errors.Errorf("expected string in workqueue but got %#v", obj)) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // Run the syncHandler, passing it the namespace/name string of the @@ -259,7 +257,7 @@ func (c *SnapController) processNextWorkItem() bool { if err := c.syncHandler(key); err != nil { // Put the item back on the workqueue to handle any transient errors. c.workqueue.AddRateLimited(key) - return errors.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. diff --git a/pkg/mgmt/volume/builder.go b/pkg/mgmt/volume/builder.go index 04b635eb..18c53a72 100644 --- a/pkg/mgmt/volume/builder.go +++ b/pkg/mgmt/volume/builder.go @@ -38,7 +38,7 @@ const ( Resource = "lvmvolumes" ) -var resource = schema.GroupVersionResource{ +var volresource = schema.GroupVersionResource{ Group: GroupOpenebsIO, Version: VersionV1alpha1, Resource: Resource, @@ -74,7 +74,7 @@ type VolController struct { func newVolController(kubeClient kubernetes.Interface, client dynamic.Interface, dynInformer dynamicinformer.DynamicSharedInformerFactory) *VolController { - volInformer := dynInformer.ForResource(resource).Informer() + volInformer := dynInformer.ForResource(volresource).Informer() klog.Infof("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) @@ -85,7 +85,7 @@ func newVolController(kubeClient kubernetes.Interface, client dynamic.Interface, volCtrller := &VolController{ kubeclientset: kubeClient, clientset: client, - VolLister: dynamiclister.New(volInformer.GetIndexer(), resource), + VolLister: dynamiclister.New(volInformer.GetIndexer(), volresource), VolSynced: volInformer.HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Vol"), recorder: recorder, diff --git a/pkg/mgmt/volume/volume.go b/pkg/mgmt/volume/volume.go index 11cb5ee8..4a2d196c 100644 --- a/pkg/mgmt/volume/volume.go +++ b/pkg/mgmt/volume/volume.go @@ -18,7 +18,6 @@ package volume import ( "fmt" - "github.com/pkg/errors" "regexp" "sort" "strconv" @@ -151,13 +150,13 @@ func (c *VolController) enqueueVol(obj interface{}) { func (c *VolController) getStructuredObject(obj interface{}) (*apis.LVMVolume, bool) { unstructuredInterface, ok := obj.(*unstructured.Unstructured) if !ok { - runtime.HandleError(errors.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) + runtime.HandleError(fmt.Errorf("couldnt type assert obj: %#v to unstructured obj", obj)) return nil, false } vol := &apis.LVMVolume{} err := runtimenew.DefaultUnstructuredConverter.FromUnstructured(unstructuredInterface.UnstructuredContent(), &vol) if err != nil { - klog.Infof("err %s, While converting unstructured obj to typed object\n", err.Error()) + runtime.HandleError(fmt.Errorf("err %s, While converting unstructured obj to typed object\n", err.Error())) return nil, false } return vol, true From 8e688eb3d3db9d54eca5ed24097c9357f12fb9de Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Wed, 1 Jun 2022 17:43:28 +0530 Subject: [PATCH 12/13] Sorted imports Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/builder.go | 3 ++- pkg/mgmt/lvmnode/lvmnode.go | 5 +++-- pkg/mgmt/lvmnode/start.go | 5 +++-- pkg/mgmt/snapshot/builder.go | 3 ++- pkg/mgmt/snapshot/snapshot.go | 3 ++- pkg/mgmt/snapshot/start.go | 5 +++-- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pkg/mgmt/lvmnode/builder.go b/pkg/mgmt/lvmnode/builder.go index aec8edee..c2405882 100644 --- a/pkg/mgmt/lvmnode/builder.go +++ b/pkg/mgmt/lvmnode/builder.go @@ -17,6 +17,8 @@ package lvmnode import ( + "time" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -30,7 +32,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "time" ) const ( diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index 7e685e56..af113308 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -18,11 +18,12 @@ package lvmnode import ( "fmt" + "reflect" + "time" + "github.com/openebs/lib-csi/pkg/common/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" runtimenew "k8s.io/apimachinery/pkg/runtime" - "reflect" - "time" apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" "github.com/openebs/lvm-localpv/pkg/builder/nodebuilder" diff --git a/pkg/mgmt/lvmnode/start.go b/pkg/mgmt/lvmnode/start.go index b0b19126..6a8020e1 100644 --- a/pkg/mgmt/lvmnode/start.go +++ b/pkg/mgmt/lvmnode/start.go @@ -18,6 +18,9 @@ package lvmnode import ( "context" + "sync" + "time" + k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" "github.com/openebs/lvm-localpv/pkg/lvm" "github.com/pkg/errors" @@ -28,8 +31,6 @@ import ( "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/klog" - "sync" - "time" ) // Start starts the lvmnode controller. diff --git a/pkg/mgmt/snapshot/builder.go b/pkg/mgmt/snapshot/builder.go index cf857b78..aaa0883e 100644 --- a/pkg/mgmt/snapshot/builder.go +++ b/pkg/mgmt/snapshot/builder.go @@ -17,6 +17,8 @@ limitations under the License. package snapshot import ( + "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -29,7 +31,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "time" ) const ( diff --git a/pkg/mgmt/snapshot/snapshot.go b/pkg/mgmt/snapshot/snapshot.go index cf6d2b0a..e544c08f 100644 --- a/pkg/mgmt/snapshot/snapshot.go +++ b/pkg/mgmt/snapshot/snapshot.go @@ -18,6 +18,8 @@ package snapshot import ( "fmt" + "time" + apis "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1" lvm "github.com/openebs/lvm-localpv/pkg/lvm" k8serror "k8s.io/apimachinery/pkg/api/errors" @@ -27,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/klog" - "time" ) // isDeletionCandidate checks if a lvm snapshot is a deletion candidate. diff --git a/pkg/mgmt/snapshot/start.go b/pkg/mgmt/snapshot/start.go index 26478cb8..30c56da5 100644 --- a/pkg/mgmt/snapshot/start.go +++ b/pkg/mgmt/snapshot/start.go @@ -17,6 +17,9 @@ limitations under the License. package snapshot import ( + "sync" + "time" + "github.com/pkg/errors" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" @@ -24,8 +27,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" - "sync" - "time" ) var ( From e8e9511130a6eff26e2fbc1008e0023e74993fe0 Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Thu, 2 Jun 2022 10:57:33 +0530 Subject: [PATCH 13/13] Incorporating review comments Signed-off-by: Abhilash Shetty --- pkg/mgmt/lvmnode/lvmnode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mgmt/lvmnode/lvmnode.go b/pkg/mgmt/lvmnode/lvmnode.go index af113308..d787a101 100644 --- a/pkg/mgmt/lvmnode/lvmnode.go +++ b/pkg/mgmt/lvmnode/lvmnode.go @@ -47,7 +47,7 @@ func (c *NodeController) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(errors.Errorf("invalid resource key: %s", key)) + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil }