Skip to content

Commit

Permalink
Create objectbucketclaim for each client
Browse files Browse the repository at this point in the history
Signed-off-by: vbadrina <vbadrina@redhat.com>
  • Loading branch information
vbnrh committed Sep 9, 2024
1 parent 52f1a92 commit 24a3637
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 53 deletions.
71 changes: 19 additions & 52 deletions addons/agent_mirrorpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ package addons

import (
"context"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"strconv"
"time"

obv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
"github.com/red-hat-storage/odf-multicluster-orchestrator/controllers/utils"
Expand Down Expand Up @@ -310,59 +307,27 @@ func (r *MirrorPeerReconciler) labelRBDStorageClasses(ctx context.Context, stora
}

func (r *MirrorPeerReconciler) createS3(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, scNamespace string) error {
noobaaOBC, err := r.getS3bucket(ctx, mirrorPeer, scNamespace)
if err != nil {
if errors.IsNotFound(err) {
r.Logger.Info("ODR ObjectBucketClaim not found, creating new one", "MirrorPeer", mirrorPeer.Name, "namespace", scNamespace)
err = r.SpokeClient.Create(ctx, noobaaOBC)
if err != nil {
r.Logger.Error("Failed to create ODR ObjectBucketClaim", "error", err, "MirrorPeer", mirrorPeer.Name, "namespace", scNamespace)
return err
}
logger := r.Logger.With("MirrorPeer", mirrorPeer.Name)
bucketCount := 1
if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
bucketCount = 2
}
for index := 0; index < bucketCount; index++ {
bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace)
var bucketName string
if utils.IsStorageClientType(mirrorPeer.Spec.Items) {
bucketName = utils.GenerateBucketName(mirrorPeer, mirrorPeer.Spec.Items[index].StorageClusterRef.Name)
} else {
r.Logger.Error("Failed to retrieve ODR ObjectBucketClaim", "error", err, "MirrorPeer", mirrorPeer.Name, "namespace", scNamespace)
bucketName = utils.GenerateBucketName(mirrorPeer)
}
operationResult, err := utils.CreateOrUpdateObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace)
if err != nil {
return err
}
} else {
r.Logger.Info("ODR ObjectBucketClaim already exists, no action needed", "MirrorPeer", mirrorPeer.Name, "namespace", scNamespace)
}
return nil
}

func (r *MirrorPeerReconciler) getS3bucket(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, scNamespace string) (*obv1alpha1.ObjectBucketClaim, error) {
var peerAccumulator string
for _, peer := range mirrorPeer.Spec.Items {
peerAccumulator += peer.ClusterName
logger.Info(fmt.Sprintf("ObjectBucketClaim %s was %s in namespace %s", bucketName, operationResult, bucketNamespace))
}
checksum := sha1.Sum([]byte(peerAccumulator))

bucketGenerateName := utils.BucketGenerateName
// truncate to bucketGenerateName + "-" + first 12 (out of 20) byte representations of sha1 checksum
bucket := fmt.Sprintf("%s-%s", bucketGenerateName, hex.EncodeToString(checksum[:]))[0 : len(bucketGenerateName)+1+12]
namespace := utils.GetEnv("ODR_NAMESPACE", scNamespace)

noobaaOBC := &obv1alpha1.ObjectBucketClaim{
ObjectMeta: metav1.ObjectMeta{
Name: bucket,
Namespace: namespace,
},
Spec: obv1alpha1.ObjectBucketClaimSpec{
BucketName: bucket,
StorageClassName: namespace + ".noobaa.io",
},
}

err := r.SpokeClient.Get(ctx, types.NamespacedName{Name: bucket, Namespace: namespace}, noobaaOBC)
if err != nil {
if errors.IsNotFound(err) {
r.Logger.Info("ObjectBucketClaim not found, will be created", "bucket", bucket, "namespace", namespace)
} else {
r.Logger.Error("Failed to get ObjectBucketClaim", "error", err, "bucket", bucket, "namespace", namespace)
}
} else {
r.Logger.Info("ObjectBucketClaim retrieved successfully", "bucket", bucket, "namespace", namespace)
}
return noobaaOBC, err
return nil
}

// enableMirroring is a wrapper function around toggleMirroring to enable mirroring in a storage cluster
Expand Down Expand Up @@ -548,7 +513,9 @@ func (r *MirrorPeerReconciler) deleteGreenSecret(ctx context.Context, spokeClust
// deleteS3 deletes the S3 bucket in the storage cluster namespace, each new mirrorpeer generates
// a new bucket, so we do not need to check if the bucket is being used by another mirrorpeer
func (r *MirrorPeerReconciler) deleteS3(ctx context.Context, mirrorPeer multiclusterv1alpha1.MirrorPeer, scNamespace string) error {
noobaaOBC, err := r.getS3bucket(ctx, mirrorPeer, scNamespace)
bucketName := utils.GenerateBucketName(mirrorPeer)
bucketNamespace := utils.GetEnv("ODR_NAMESPACE", scNamespace)
noobaaOBC, err := utils.GetObjectBucketClaim(ctx, r.SpokeClient, bucketName, bucketNamespace)
if err != nil {
if errors.IsNotFound(err) {
r.Logger.Info("ODR ObjectBucketClaim not found, skipping deletion", "namespace", scNamespace, "MirrorPeer", mirrorPeer.Name)
Expand Down
2 changes: 1 addition & 1 deletion addons/agent_mirrorpeer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestDeleteGreenSecret(t *testing.T) {
}

func TestDeleteS3(t *testing.T) {
bucketName := "odrbucket-b1b922184baf"
bucketName := utils.GenerateBucketName(mirrorPeer)
ctx := context.TODO()
scheme := mgrScheme
fakeHubClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&mirrorpeer1).Build()
Expand Down
17 changes: 17 additions & 0 deletions controllers/utils/hash.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package utils

import (
"crypto/sha1"
"crypto/sha512"
"encoding/hex"
"fmt"
"hash/fnv"
"sort"
"strings"

multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
)

/*
Expand Down Expand Up @@ -51,3 +55,16 @@ func CreateUniqueReplicationId(clusterFSIDs map[string]string) (string, error) {
sort.Strings(fsids)
return CreateUniqueName(fsids...)[0:39], nil
}

func GenerateUniqueIdForMirrorPeer(mirrorPeer multiclusterv1alpha1.MirrorPeer) string {
var peerAccumulator []string

for _, peer := range mirrorPeer.Spec.Items {
peerAccumulator = append(peerAccumulator, peer.ClusterName)
}

sort.Strings(peerAccumulator)

checksum := sha1.Sum([]byte(strings.Join(peerAccumulator, "-")))
return hex.EncodeToString(checksum[:])
}
53 changes: 53 additions & 0 deletions controllers/utils/s3.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package utils

import (
"context"
"fmt"
"os"

obv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1"
multiclusterv1alpha1 "github.com/red-hat-storage/odf-multicluster-orchestrator/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
Expand Down Expand Up @@ -43,3 +48,51 @@ func GetEnv(key, defaultValue string) string {
}
return defaultValue
}

func GenerateBucketName(mirrorPeer multiclusterv1alpha1.MirrorPeer, clientName ...string) string {
mirrorPeerId := GenerateUniqueIdForMirrorPeer(mirrorPeer)
bucketGenerateName := BucketGenerateName
if len(clientName) > 0 && clientName[0] != "" {
bucketGenerateName = fmt.Sprintf("%s-%s", BucketGenerateName, clientName[0])
}

return fmt.Sprintf("%s-%s", bucketGenerateName, mirrorPeerId)
}

func CreateOrUpdateObjectBucketClaim(ctx context.Context, c client.Client, bucketName, bucketNamespace string) (controllerutil.OperationResult, error) {
noobaaOBC := &obv1alpha1.ObjectBucketClaim{
ObjectMeta: metav1.ObjectMeta{
Name: bucketName,
Namespace: bucketNamespace,
},
}

operationResult, err := controllerutil.CreateOrUpdate(ctx, c, noobaaOBC, func() error {
noobaaOBC.Spec = obv1alpha1.ObjectBucketClaimSpec{
BucketName: bucketName,
StorageClassName: fmt.Sprintf("%s.noobaa.io", bucketNamespace),
}

return nil
})

if err != nil {
return controllerutil.OperationResultNone, fmt.Errorf("failed to create or update ObjectBucketClaim %s/%s: %w", bucketNamespace, bucketName, err)
}

return operationResult, nil
}

func GetObjectBucketClaim(ctx context.Context, c client.Client, bucketName, bucketNamespace string) (*obv1alpha1.ObjectBucketClaim, error) {
noobaaOBC := &obv1alpha1.ObjectBucketClaim{}
err := c.Get(ctx, client.ObjectKey{
Name: bucketName,
Namespace: bucketNamespace,
}, noobaaOBC)

if err != nil {
return nil, fmt.Errorf("failed to fetch ObjectBucketClaim %s/%s: %w", bucketNamespace, bucketName, err)
}

return noobaaOBC, nil
}

0 comments on commit 24a3637

Please sign in to comment.