Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
skdwriting authored Sep 21, 2020
2 parents bed063e + 3d37481 commit 8e8430a
Show file tree
Hide file tree
Showing 54 changed files with 4,775 additions and 114 deletions.
18 changes: 18 additions & 0 deletions contrib/datastore/file/gcp/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,22 @@ const (

// The IPv4 internet protocol mode for GCP.
InternetProtocolModeIpv4 = "MODE_IPV4"

// The Unique name of the instance resource in GCP.
InstanceResourceName = "InstanceResourceName"

// The time that the file system was created in GCP.
CreationTimeAtBackend = "CreationTimeAtBackend"

// Server-specified ETag in GCP.
Etag = "Etag"

// Networks: VPC networks to which the instance is connected, in GCP.
Networks = "Networks"

// FileStore Instance State, in GCP.
State = "State"

// FileStore Instance Status about its State, in GCP.
StatusMessage = "StatusMessage"
)
230 changes: 222 additions & 8 deletions contrib/datastore/file/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ package gcp
import (
"context"
"fmt"
"time"

"github.com/micro/go-micro/v2/util/log"
"github.com/opensds/multi-cloud/contrib/utils"
"k8s.io/apimachinery/pkg/util/wait"

backendpb "github.com/opensds/multi-cloud/backend/proto"
file "github.com/opensds/multi-cloud/file/proto"
Expand All @@ -34,8 +36,14 @@ import (

const (
locationURIFmt = "projects/%s/locations/%s"
instanceURIFmt = locationURIFmt + "/instances/%s"

// GCP FS Patch update masks
fileShareUpdateMask = "file_shares"
)

var createOp, updateOp, deleteOp *gcpfilev1.Operation

type GcpAdapter struct {
backend *backendpb.BackendDetail
fileService *gcpfilev1.Service
Expand All @@ -44,7 +52,7 @@ type GcpAdapter struct {
}

func (g GcpAdapter) CreateFileShare(ctx context.Context, fs *file.CreateFileShareRequest) (*file.CreateFileShareResponse, error) {
instanceName := fs.Fileshare.Name + "-instance"
instanceId := g.getInstanceId(fs.Fileshare.Name)
labels := make(map[string]string)
for _, tag := range fs.Fileshare.Tags {
labels[tag.Key] = tag.Value
Expand All @@ -69,14 +77,15 @@ func (g GcpAdapter) CreateFileShare(ctx context.Context, fs *file.CreateFileShar
}

log.Infof("Starting CreateInstance on cloud backend")
log.Debugf("Creating instance %s: location %s, tier %s, capacity %s, labels %v", instanceName,
log.Debugf("Creating instance %s: location %s, tier %s, capacity %s, labels %v", instanceId,
fs.Fileshare.AvailabilityZone, instance.Tier, instance.FileShares[0].CapacityGb, instance.Labels)

op, err := g.instancesService.Create(g.locationURI(g.backend.Region,
fs.Fileshare.AvailabilityZone), instance).InstanceId(instanceName).Context(ctx).Do()
fs.Fileshare.AvailabilityZone), instance).InstanceId(instanceId).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("createInstance operation failed: %v", err)
}
createOp = op
log.Infof("Create File share, Operation Resource: %s submitted to cloud backend", op.Name)

return &file.CreateFileShareResponse{
Expand All @@ -85,27 +94,232 @@ func (g GcpAdapter) CreateFileShare(ctx context.Context, fs *file.CreateFileShar
}

func (g GcpAdapter) GetFileShare(ctx context.Context, fs *file.GetFileShareRequest) (*file.GetFileShareResponse, error) {
panic("implement me")
isCreateOpDone, err := g.isOperationDone(createOp)
if g.checkForOperation(ctx, isCreateOpDone, createOp, "create", err) != nil {
log.Error(err)
return nil, err
}

isUpdateOpDone, err := g.isOperationDone(updateOp)
if g.checkForOperation(ctx, isUpdateOpDone, updateOp, "update", err) != nil {
log.Error(err)
return nil, err
}

isDeleteOpDone, err := g.isOperationDone(deleteOp)
if g.checkForOperation(ctx, isDeleteOpDone, deleteOp, "delete", err) != nil {
log.Error(err)
return nil, err
}

instance, err := g.GetInstance(ctx, fs.Fileshare)
if err != nil {
log.Error(err)
return nil, err
}
log.Debugf("Found Instance %+v", instance)

fileShare, err := g.ParseFileShare(instance)
if err != nil {
log.Error(err)
return nil, err
}

if instance.State == "READY" {
// GCP FileStore Instance exists & ready to use
fileShare.Status = "available"
return &file.GetFileShareResponse{
Fileshare: fileShare,
}, nil
} else {
// GCP FileStore Instance exists but is not ready to use
return &file.GetFileShareResponse{
Fileshare: fileShare,
}, fmt.Errorf("instance %s is in %s", instance.Name, instance.State)
}
}

func (g GcpAdapter) ListFileShare(ctx context.Context, fs *file.ListFileShareRequest) (*file.ListFileShareResponse, error) {
panic("implement me")
instances, err := g.instancesService.List(g.locationURI(g.backend.Region, "-")).Context(ctx).Do()
if err != nil {
log.Error(err)
return nil, err
}

var fileshares []*file.FileShare
for _, instance := range instances.Instances {
fs, err := g.ParseFileShare(instance)
if err != nil {
log.Error(err)
return nil, err
}
fileshares = append(fileshares, fs)
}

log.Debugf("List File shares = %+v", fileshares)

return &file.ListFileShareResponse{
Fileshares: fileshares,
}, nil
}

func (g GcpAdapter) UpdatefileShare(ctx context.Context, fs *file.UpdateFileShareRequest) (*file.UpdateFileShareResponse, error) {
panic("implement me")
instance, err := g.GetInstance(ctx, fs.Fileshare)
if err != nil {
log.Error(err)
return nil, err
}
log.Debugf("Found Instance %+v", instance)

instanceId := g.getInstanceId(fs.Fileshare.Name)
instanceUri := g.instanceURI(g.backend.Region, fs.Fileshare.AvailabilityZone, instanceId)


// Create a file instance for the Patch request.
instance = &gcpfilev1.Instance{
Tier: fs.Fileshare.Metadata.Fields["Tier"].GetStringValue(),
FileShares: []*gcpfilev1.FileShareConfig{
{
Name: fs.Fileshare.Name,
CapacityGb: fs.Fileshare.Size / utils.GB_FACTOR,
},
},
Networks: []*gcpfilev1.NetworkConfig{
{
Network: "default",
Modes: []string{"MODE_IPV4"},
},
},
}

log.Info("Starting PatchInstance operation on cloud backend")
log.Debugf("Patching instance %v: location %v, tier %v, capacity %v, labels %v", instanceId,
fs.Fileshare.AvailabilityZone, instance.Tier, instance.FileShares[0].CapacityGb, instance.Labels)

op, err := g.instancesService.Patch(instanceUri, instance).UpdateMask(fileShareUpdateMask).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("patch operation failed: %v", err)
}
updateOp = op
log.Infof("Patch File share, Operation Resource = %v", op.Name)

return &file.UpdateFileShareResponse{
Fileshare: fs.Fileshare,
}, nil
}

func (g GcpAdapter) DeleteFileShare(ctx context.Context, fs *file.DeleteFileShareRequest) (*file.DeleteFileShareResponse, error) {
panic("implement me")
instance, err := g.GetInstance(ctx, fs.Fileshare)
if err != nil {
log.Error(err)
return nil, err
}
log.Debugf("Found Instance %+v", instance)

log.Infof("Starting DeleteInstance operation on cloud backend")
instanceId := g.getInstanceId(fs.Fileshare.Name)
op, err := g.instancesService.Delete(g.instanceURI(g.backend.Region, fs.Fileshare.AvailabilityZone, instanceId)).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("deleteInstance operation failed: %v", err)
}
deleteOp = op
log.Infof("Delete File share, Operation Resource = %v", op.Name)

return &file.DeleteFileShareResponse{}, nil
}

func (g *GcpAdapter) ParseFileShare(instance *gcpfilev1.Instance) (*file.FileShare, error) {
var tags []*file.Tag
for key, value := range instance.Labels {
tags = append(tags, &file.Tag{
Key: key,
Value: value,
})
}

meta := map[string]interface{}{
InstanceResourceName: instance.Name,
CreationTimeAtBackend: instance.CreateTime,
Etag: instance.Etag,
Networks: instance.Networks,
State: instance.State,
StatusMessage: instance.StatusMessage,
}

metadata, err := utils.ConvertMapToStruct(meta)
if err != nil {
log.Errorf("failed to convert metadata = [%+v] to struct", metadata, err)
return nil, err
}

fileshare := &file.FileShare{
Size: instance.FileShares[0].CapacityGb * utils.GB_FACTOR,
Tags: tags,
Metadata: metadata,
}

return fileshare, nil
}

func (g *GcpAdapter) GetInstance(ctx context.Context, fs *file.FileShare) (*gcpfilev1.Instance, error) {
instanceId := g.getInstanceId(fs.Name)
instance, err := g.instancesService.Get(g.instanceURI(g.backend.Region, fs.AvailabilityZone, instanceId)).Context(ctx).Do()
if err != nil || instance == nil {
return nil, fmt.Errorf("failed to get instance %v", err)
}

if instance.State != "READY" {
return instance, fmt.Errorf("instance %s is in %s", instanceId, instance.State)
}

return instance, nil
}

func (g *GcpAdapter) checkForOperation(ctx context.Context, isOpDone bool, op *gcpfilev1.Operation, opType string, err error) error {
if op != nil && !isOpDone && err == nil {
err = g.waitForOperation(ctx, op)
if err != nil {
log.Errorf("wait For %v operation failed: %v", opType, err)
return err
}
}
return nil
}

func (g *GcpAdapter) waitForOperation(ctx context.Context, op *gcpfilev1.Operation) error {
return wait.Poll(5*time.Second, 5*time.Minute, func() (bool, error) {
pollOp, err := g.operationsService.Get(op.Name).Context(ctx).Do()
if err != nil {
return false, err
}
return g.isOperationDone(pollOp)
})
}

func (g GcpAdapter) isOperationDone(op *gcpfilev1.Operation) (bool, error) {
if op == nil {
return false, nil
}
if op.Error != nil {
return true, fmt.Errorf("operation %s failed (%s): %s", op.Name, op.Error.Code, op.Error.Message)
}
return op.Done, nil
}

func (g GcpAdapter) getInstanceId(fsName string) string {
return fsName + "-instance"
}

func (g GcpAdapter) locationURI(project, location string) string {
return fmt.Sprintf(locationURIFmt, project, location)
}

func (g GcpAdapter) instanceURI(project, location, name string) string {
return fmt.Sprintf(instanceURIFmt, project, location, name)
}

func (g GcpAdapter) Close() error {
panic("implement me")
// TODO:
return nil
}

11 changes: 10 additions & 1 deletion file/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (f *fileService) UpdateFileShare(ctx context.Context, in *pb.UpdateFileShar
}

in.Fileshare.Name = res.Name
in.Fileshare.AvailabilityZone = res.AvailabilityZone
if backend.Backend.Type == constants.BackendTypeAwsFile {
metaMap, err := utils.ConvertMetadataStructToMap(in.Fileshare.Metadata)
if err != nil {
Expand All @@ -277,6 +278,13 @@ func (f *fileService) UpdateFileShare(ctx context.Context, in *pb.UpdateFileShar
return err
}
in.Fileshare.Metadata = metaStruct
} else if backend.Backend.Type == constants.BackendTypeGcsFile {
metaStruct, err := driverutils.ConvertMapToStruct(res.Metadata)
if err != nil {
log.Errorf("Failed to convert metaMap: [%+v] to metaStruct %s\n", res.Metadata, err)
return err
}
in.Fileshare.Metadata = metaStruct
}

fs, err := sd.UpdatefileShare(ctx, in)
Expand Down Expand Up @@ -383,6 +391,7 @@ func (f *fileService) DeleteFileShare(ctx context.Context, in *pb.DeleteFileShar

fileshare := &pb.FileShare{
Name: res.Name,
AvailabilityZone: res.AvailabilityZone,
}

metadata, err := driverutils.ConvertMapToStruct(res.Metadata)
Expand Down Expand Up @@ -421,7 +430,7 @@ func (f *fileService) SyncFileShare(ctx context.Context, fs *pb.FileShare, backe
time.Sleep(6 * time.Second)

ctxBg := context.Background()
ctxBg, _ = context.WithTimeout(ctxBg, 10 * time.Second)
ctxBg, _ = context.WithTimeout(ctxBg, 10 * time.Minute)

fsGetInput := &pb.GetFileShareRequest{Fileshare: fs}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ require (
google.golang.org/protobuf v1.22.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f
gopkg.in/natefinch/lumberjack.v2 v2.0.0
k8s.io/apimachinery v0.18.5
)
Loading

0 comments on commit 8e8430a

Please sign in to comment.