@@ -2,33 +2,37 @@ package bucketclaim
22
33import (
44 "context"
5+ "fmt"
56
67 v1 "k8s.io/api/core/v1"
7- "k8s.io/apimachinery/pkg/api/errors"
8+ kubeerrors "k8s.io/apimachinery/pkg/api/errors"
89 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+ "k8s.io/apimachinery/pkg/runtime"
911 kubeclientset "k8s.io/client-go/kubernetes"
12+ "k8s.io/client-go/tools/record"
1013 "k8s.io/klog/v2"
11- "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
12-
1314 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage/v1alpha1"
1415 bucketclientset "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned"
1516 objectstoragev1alpha1 "sigs.k8s.io/container-object-storage-interface-api/client/clientset/versioned/typed/objectstorage/v1alpha1"
16-
17+ "sigs.k8s.io/container-object-storage-interface-api/controller/events"
1718 "sigs.k8s.io/container-object-storage-interface-controller/pkg/util"
19+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1820)
1921
20- // bucketClaimListener is a resource handler for bucket requests objects
21- type bucketClaimListener struct {
22+ // BucketClaimListener is a resource handler for bucket requests objects
23+ type BucketClaimListener struct {
24+ eventRecorder record.EventRecorder
25+
2226 kubeClient kubeclientset.Interface
2327 bucketClient bucketclientset.Interface
2428}
2529
26- func NewBucketClaimListener () * bucketClaimListener {
27- return & bucketClaimListener {}
30+ func NewBucketClaimListener () * BucketClaimListener {
31+ return & BucketClaimListener {}
2832}
2933
3034// Add creates a bucket in response to a bucketClaim
31- func (b * bucketClaimListener ) Add (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
35+ func (b * BucketClaimListener ) Add (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
3236 klog .V (3 ).InfoS ("Add BucketClaim" ,
3337 "name" , bucketClaim .ObjectMeta .Name ,
3438 "ns" , bucketClaim .ObjectMeta .Namespace ,
@@ -39,7 +43,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
3943 if err != nil {
4044 switch err {
4145 case util .ErrInvalidBucketClass :
42- klog .V (3 ).ErrorS (util . ErrInvalidBucketClass ,
46+ klog .V (3 ).ErrorS (err ,
4347 "bucketClaim" , bucketClaim .ObjectMeta .Name ,
4448 "ns" , bucketClaim .ObjectMeta .Namespace ,
4549 "bucketClassName" , bucketClaim .Spec .BucketClassName )
@@ -65,7 +69,7 @@ func (b *bucketClaimListener) Add(ctx context.Context, bucketClaim *v1alpha1.Buc
6569}
6670
6771// update processes any updates made to the bucket request
68- func (b * bucketClaimListener ) Update (ctx context.Context , old , new * v1alpha1.BucketClaim ) error {
72+ func (b * BucketClaimListener ) Update (ctx context.Context , old , new * v1alpha1.BucketClaim ) error {
6973 klog .V (3 ).InfoS ("Update BucketClaim" ,
7074 "name" , old .Name ,
7175 "ns" , old .Namespace )
@@ -80,7 +84,7 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
8084 klog .V (3 ).ErrorS (err , "Error deleting bucket" ,
8185 "bucket" , bucketName ,
8286 "bucketClaim" , bucketClaim .ObjectMeta .Name )
83- return err
87+ return b . recordError ( bucketClaim , v1 . EventTypeWarning , events . FailedDeleteBucket , err )
8488 }
8589
8690 klog .V (5 ).Infof ("Successfully deleted bucket: %s from bucketClaim: %s" , bucketName , bucketClaim .ObjectMeta .Name )
@@ -94,22 +98,22 @@ func (b *bucketClaimListener) Update(ctx context.Context, old, new *v1alpha1.Buc
9498}
9599
96100// Delete processes a bucket for which bucket request is deleted
97- func (b * bucketClaimListener ) Delete (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
98- klog .V (3 ).Info ("Delete BucketClaim" ,
101+ func (b * BucketClaimListener ) Delete (ctx context.Context , bucketClaim * v1alpha1.BucketClaim ) error {
102+ klog .V (3 ).InfoS ("Delete BucketClaim" ,
99103 "name" , bucketClaim .ObjectMeta .Name ,
100104 "ns" , bucketClaim .ObjectMeta .Namespace )
101105
102106 return nil
103107}
104108
105109// provisionBucketClaimOperation attempts to provision a bucket for a given bucketClaim.
106- // Return values
107110//
108- // nil - BucketClaim successfully processed
109- // ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
110- // ErrBucketAlreadyExists - BucketClaim already processed
111- // non-nil err - Internal error [requeue'd with exponential backoff]
112- func (b * bucketClaimListener ) provisionBucketClaimOperation (ctx context.Context , inputBucketClaim * v1alpha1.BucketClaim ) error {
111+ // Return values
112+ // - nil - BucketClaim successfully processed
113+ // - ErrInvalidBucketClass - BucketClass does not exist [requeue'd with exponential backoff]
114+ // - ErrBucketAlreadyExists - BucketClaim already processed
115+ // - non-nil err - Internal error [requeue'd with exponential backoff]
116+ func (b * BucketClaimListener ) provisionBucketClaimOperation (ctx context.Context , inputBucketClaim * v1alpha1.BucketClaim ) error {
113117 bucketClaim := inputBucketClaim .DeepCopy ()
114118 if bucketClaim .Status .BucketReady {
115119 return util .ErrBucketAlreadyExists
@@ -121,9 +125,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
121125 if bucketClaim .Spec .ExistingBucketName != "" {
122126 bucketName = bucketClaim .Spec .ExistingBucketName
123127 bucket , err := b .buckets ().Get (ctx , bucketName , metav1.GetOptions {})
124- if err != nil {
128+ if kubeerrors .IsNotFound (err ) {
129+ return b .recordError (inputBucketClaim , v1 .EventTypeWarning , events .FailedCreateBucket , err )
130+ } else if err != nil {
125131 klog .V (3 ).ErrorS (err , "Get Bucket with ExistingBucketName error" , "name" , bucketClaim .Spec .ExistingBucketName )
126- return err
132+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
127133 }
128134
129135 bucket .Spec .BucketClaim = & v1.ObjectReference {
@@ -141,21 +147,23 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
141147 klog .V (3 ).ErrorS (err , "Error updating existing bucket" ,
142148 "bucket" , bucket .ObjectMeta .Name ,
143149 "bucketClaim" , bucketClaim .ObjectMeta .Name )
144- return err
150+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
145151 }
146152
147153 bucketClaim .Status .BucketName = bucketName
148154 bucketClaim .Status .BucketReady = true
149155 } else {
150156 bucketClassName := bucketClaim .Spec .BucketClassName
151157 if bucketClassName == "" {
152- return util .ErrInvalidBucketClass
158+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , util .ErrInvalidBucketClass )
153159 }
154160
155161 bucketClass , err := b .bucketClasses ().Get (ctx , bucketClassName , metav1.GetOptions {})
156- if err != nil {
162+ if kubeerrors .IsNotFound (err ) {
163+ return b .recordError (inputBucketClaim , v1 .EventTypeWarning , events .FailedCreateBucket , err )
164+ } else if err != nil {
157165 klog .V (3 ).ErrorS (err , "Get Bucketclass Error" , "name" , bucketClassName )
158- return util . ErrInvalidBucketClass
166+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
159167 }
160168
161169 bucketName = bucketClassName + string (bucketClaim .ObjectMeta .UID )
@@ -180,11 +188,11 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
180188
181189 bucket .Spec .Protocols = protocolCopy
182190 bucket , err = b .buckets ().Create (ctx , bucket , metav1.CreateOptions {})
183- if err != nil && ! errors .IsAlreadyExists (err ) {
191+ if err != nil && ! kubeerrors .IsAlreadyExists (err ) {
184192 klog .V (3 ).ErrorS (err , "Error creationg bucket" ,
185193 "bucket" , bucketName ,
186194 "bucketClaim" , bucketClaim .ObjectMeta .Name )
187- return err
195+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
188196 }
189197
190198 bucketClaim .Status .BucketName = bucketName
@@ -196,7 +204,7 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
196204 bucketClaim , err = b .bucketClaims (bucketClaim .ObjectMeta .Namespace ).UpdateStatus (ctx , bucketClaim , metav1.UpdateOptions {})
197205 if err != nil {
198206 klog .V (3 ).ErrorS (err , "Failed to update status of BucketClaim" , "name" , bucketClaim .ObjectMeta .Name )
199- return err
207+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
200208 }
201209
202210 // Add the finalizers so that bucketClaim is deleted
@@ -205,38 +213,63 @@ func (b *bucketClaimListener) provisionBucketClaimOperation(ctx context.Context,
205213 _ , err = b .bucketClaims (bucketClaim .ObjectMeta .Namespace ).Update (ctx , bucketClaim , metav1.UpdateOptions {})
206214 if err != nil {
207215 klog .V (3 ).ErrorS (err , "Failed to add finalizer BucketClaim" , "name" , bucketClaim .ObjectMeta .Name )
208- return err
216+ return b . recordError ( inputBucketClaim , v1 . EventTypeWarning , events . FailedCreateBucket , err )
209217 }
210218
211219 klog .V (3 ).Infof ("Finished creating Bucket %v" , bucketName )
212220 return nil
213221}
214222
215- func (b * bucketClaimListener ) InitializeKubeClient (k kubeclientset.Interface ) {
223+ // InitializeKubeClient initializes the kubernetes client
224+ func (b * BucketClaimListener ) InitializeKubeClient (k kubeclientset.Interface ) {
216225 b .kubeClient = k
217226}
218227
219- func (b * bucketClaimListener ) InitializeBucketClient (bc bucketclientset.Interface ) {
228+ // InitializeBucketClient initializes the object storage bucket client
229+ func (b * BucketClaimListener ) InitializeBucketClient (bc bucketclientset.Interface ) {
220230 b .bucketClient = bc
221231}
222232
223- func (b * bucketClaimListener ) buckets () objectstoragev1alpha1.BucketInterface {
233+ // InitializeEventRecorder initializes the event recorder
234+ func (b * BucketClaimListener ) InitializeEventRecorder (er record.EventRecorder ) {
235+ b .eventRecorder = er
236+ }
237+
238+ func (b * BucketClaimListener ) buckets () objectstoragev1alpha1.BucketInterface {
224239 if b .bucketClient != nil {
225240 return b .bucketClient .ObjectstorageV1alpha1 ().Buckets ()
226241 }
227242 panic ("uninitialized listener" )
228243}
229244
230- func (b * bucketClaimListener ) bucketClasses () objectstoragev1alpha1.BucketClassInterface {
245+ func (b * BucketClaimListener ) bucketClasses () objectstoragev1alpha1.BucketClassInterface {
231246 if b .bucketClient != nil {
232247 return b .bucketClient .ObjectstorageV1alpha1 ().BucketClasses ()
233248 }
234249 panic ("uninitialized listener" )
235250}
236251
237- func (b * bucketClaimListener ) bucketClaims (namespace string ) objectstoragev1alpha1.BucketClaimInterface {
252+ func (b * BucketClaimListener ) bucketClaims (namespace string ) objectstoragev1alpha1.BucketClaimInterface {
238253 if b .bucketClient != nil {
239254 return b .bucketClient .ObjectstorageV1alpha1 ().BucketClaims (namespace )
240255 }
241256 panic ("uninitialized listener" )
242257}
258+
259+ // recordError during the processing of the objects
260+ func (b * BucketClaimListener ) recordError (subject runtime.Object , eventtype , reason string , err error ) error {
261+ if b .eventRecorder == nil {
262+ return err
263+ }
264+ b .eventRecorder .Event (subject , eventtype , reason , err .Error ())
265+
266+ return err
267+ }
268+
269+ // recordEvent during the processing of the objects
270+ func (b * BucketClaimListener ) recordEvent (subject runtime.Object , eventtype , reason , message string , args ... any ) {
271+ if b .eventRecorder == nil {
272+ return
273+ }
274+ b .eventRecorder .Event (subject , eventtype , reason , fmt .Sprintf (message , args ... ))
275+ }
0 commit comments