Skip to content

Commit

Permalink
feat: support nfs floating ips
Browse files Browse the repository at this point in the history
  • Loading branch information
kristina-solovyova committed Sep 3, 2024
1 parent 47d0321 commit aa0f366
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 31 deletions.
94 changes: 94 additions & 0 deletions cloud-functions/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ func generateInstanceNamesFilter(instanceNames []string) (namesFilter string) {
return
}

func GetInstancesAliasIps(ctx context.Context, instances []*computepb.Instance) (aliasIps []string) {
for _, instance := range instances {
for _, networkInterface := range instance.NetworkInterfaces {
for _, aliasIp := range networkInterface.AliasIpRanges {
ip := strings.Split(*aliasIp.IpCidrRange, "/")[0]
aliasIps = append(aliasIps, ip)
}
}
}
return
}

func GetInstances(ctx context.Context, project, zone string, instanceNames []string) (instances []*computepb.Instance, err error) {
if len(instanceNames) == 0 {
log.Warn().Msg("Got empty instance names list")
Expand Down Expand Up @@ -743,6 +755,88 @@ func CreateBucket(ctx context.Context, project, region, obsName string) (err err
return
}

func ListInstanceInternalStaticIps(ctx context.Context, project, region, instanceName string) (ips []string, err error) {
ipClient, err := compute.NewAddressesRESTClient(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to create ips client")
return
}
defer ipClient.Close()

staticIps, err := listInstanceInternalStaticIps(ctx, ipClient, project, region, instanceName)
if err != nil {
return
}

for _, ip := range staticIps {
ips = append(ips, *ip.Address)
}
return
}

func listInstanceInternalStaticIps(ctx context.Context, client *compute.AddressesClient, project, region, instanceName string) (ips []*computepb.Address, err error) {
filter := fmt.Sprintf("name:%s-ip-*", instanceName)

req := &computepb.ListAddressesRequest{
Project: project,
Region: region,
Filter: &filter,
}

it := client.List(ctx, req)
for {
ip, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("failed to list internal static IPs: %w", err)
}
ips = append(ips, ip)
}
return
}

func ReleaseInstanceInternalStaticIps(ctx context.Context, project, zone, instanceName string) (err error) {
region := zone[:len(zone)-2]

ipClient, err := compute.NewAddressesRESTClient(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to create ips client")
return
}
defer ipClient.Close()

staticIps, err := listInstanceInternalStaticIps(ctx, ipClient, project, region, instanceName)
if err != nil {
return
}

ipNames := make([]string, len(staticIps))
for i, ip := range staticIps {
ipNames[i] = *ip.Name
}

var errs []error
for _, ipName := range ipNames {
req := &computepb.DeleteAddressRequest{
Project: project,
Region: region,
Address: ipName,
}
_, err := ipClient.Delete(ctx, req)
if err != nil {
errs = append(errs, err)
log.Error().Err(err).Msgf("Failed to delete internal static IP %s", ipName)
}
}

if len(errs) > 0 {
err = fmt.Errorf("failed to release internal static IPs: %v", errs)
}
return
}

const FindDrivesScript = `
import json
import sys
Expand Down
33 changes: 31 additions & 2 deletions cloud-functions/functions/clusterize/clusterize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -90,10 +91,22 @@ func NFSClusterize(ctx context.Context, p ClusterizationParams) (clusterizeScrip
nfsInterfaceGroupName := os.Getenv("NFS_INTERFACE_GROUP_NAME")
nfsProtocolgwsNum, _ := strconv.Atoi(os.Getenv("NFS_PROTOCOL_GATEWAYS_NUM"))
nfsSecondaryIpsNum, _ := strconv.Atoi(os.Getenv("NFS_SECONDARY_IPS_NUM"))
gateways := strings.Split(os.Getenv("GATEWAYS"), ",")
subnets := strings.Split(os.Getenv("SUBNETS"), ",")

funcDef := gcp_functions_def.NewFuncDef(p.CloudFuncRootUrl)
reportFunction := funcDef.GetFunctionCmdDefinition(functions_def.Report)

_, ipNet, err := net.ParseCIDR(subnets[0])
if err != nil {
err = fmt.Errorf("failed to parse subnet cidr %s: %w", subnets[0], err)
clusterizeScript = cloudCommon.GetErrorScript(err, reportFunction, p.Vm.Protocol)
return
}

gateway := gateways[0]
subnetMask := net.IP(ipNet.Mask).String()

state, err := common.AddInstanceToStateInstances(ctx, p.Bucket, p.NFSStateObject, p.Vm)
if err != nil {
var e *common.ExtraInstanceForClusterizationError
Expand Down Expand Up @@ -129,15 +142,31 @@ func NFSClusterize(ctx context.Context, p ClusterizationParams) (clusterizeScrip
nicNames = append(nicNames, instance.NicName)
}

// TODO: add nfsSecondaryIpsNum check
secondaryIps := make([]string, 0, nfsSecondaryIpsNum)
instancesNames := common.GetInstancesNames(state.Instances)
nfsInstances, err := common.GetInstances(ctx, p.Project, p.Zone, instancesNames)
if err != nil {
err = fmt.Errorf("failed to get instances: %w", err)
log.Error().Err(err).Send()
clusterizeScript = cloudCommon.GetErrorScript(err, reportFunction, p.Vm.Protocol)
return
}

secondaryIps := common.GetInstancesAliasIps(ctx, nfsInstances)
if len(secondaryIps) < nfsSecondaryIpsNum {
err = fmt.Errorf("not enough secondary ips found: %d/%d", len(secondaryIps), nfsSecondaryIpsNum)
log.Error().Err(err).Send()
clusterizeScript = cloudCommon.GetErrorScript(err, reportFunction, p.Vm.Protocol)
return
}

nfsParams := protocol.NFSParams{
InterfaceGroupName: nfsInterfaceGroupName,
SecondaryIps: secondaryIps,
ContainersUid: containersUid,
NicNames: nicNames,
HostsNum: nfsProtocolgwsNum,
Gateway: gateway,
SubnetMask: subnetMask,
}

scriptGenerator := clusterize.ConfigureNfsScriptGenerator{
Expand Down
111 changes: 109 additions & 2 deletions cloud-functions/functions/scale_up/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"

compute "cloud.google.com/go/compute/apiv1"
"cloud.google.com/go/compute/apiv1/computepb"
Expand Down Expand Up @@ -152,7 +153,7 @@ func CreateBackendInstance(ctx context.Context, project, zone, template, instanc
return
}

func CreateNFSInstance(ctx context.Context, project, zone, templateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl string) (err error) {
func CreateNFSInstance(ctx context.Context, project, zone, templateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl string, secondaryIpsNum int) (err error) {
instancesClient, err := compute.NewInstancesRESTClient(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to create instances client")
Expand All @@ -166,11 +167,30 @@ func CreateNFSInstance(ctx context.Context, project, zone, templateName, instanc
return
}

subnet := instanceTemplate.Properties.NetworkInterfaces[0].Subnetwork
region := zone[:len(zone)-2]

ips, err := createInternalStaticIps(ctx, project, region, *subnet, instanceName, secondaryIpsNum)
if err != nil {
log.Error().Err(err).Msg("Failed to create secondary IPs")
return
}

nics := instanceTemplate.Properties.NetworkInterfaces
// modify fisrt nic to use alias ips
nics[0].AliasIpRanges = make([]*computepb.AliasIpRange, len(ips))
for i, ip := range ips {
nics[0].AliasIpRanges[i] = &computepb.AliasIpRange{
IpCidrRange: &ip,
}
}

req := &computepb.InsertInstanceRequest{
Project: project,
Zone: zone,
InstanceResource: &computepb.Instance{
Name: proto.String(instanceName),
Name: proto.String(instanceName),
NetworkInterfaces: nics,
},
SourceInstanceTemplate: instanceTemplate.SelfLink,
}
Expand Down Expand Up @@ -325,3 +345,90 @@ func getProtocolGwInstancesFromInterfaceGroup(ctx context.Context, jpool *jrpc.P
}
return
}

func createInternalStaticIps(ctx context.Context, project, region, subnet, instanceName string, ipsNum int) (ips []string, err error) {
ipClient, err := compute.NewAddressesRESTClient(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to create ips client")
return
}
defer ipClient.Close()

defer func() {
if err != nil {
for _, ip := range ips {
req := &computepb.DeleteAddressRequest{
Project: project,
Region: region,
Address: ip,
}
_, err := ipClient.Delete(ctx, req)
if err != nil {
log.Error().Err(err).Msgf("Failed to delete internal static IP %s", ip)
}
}
}
}()

var wg sync.WaitGroup
errCh := make(chan error, ipsNum)
defer close(errCh)

for i := 0; i < ipsNum; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()

addressName := fmt.Sprintf("%s-ip-%d", instanceName, i)
addressType := "INTERNAL"
purpose := "GCE_ENDPOINT"
address := &computepb.Address{
Name: &addressName,
Subnetwork: &subnet,
AddressType: &addressType,
Region: &region,
Purpose: &purpose,
}

req := &computepb.InsertAddressRequest{
Project: project,
Region: region,
AddressResource: address,
}

operation, err := ipClient.Insert(ctx, req)
if err != nil {
errCh <- fmt.Errorf("failed to create internal static IP %s: %w", addressName, err)
return
}

err = operation.Wait(ctx)
if err != nil {
errCh <- fmt.Errorf("failed to wait for internal static IP creation %s: %w", addressName, err)
}
}(i)
}

wg.Wait()

// get all errors
var errs []string
for i := 0; i < ipsNum; i++ {
select {
case err := <-errCh:
errs = append(errs, err.Error())
default:
}
}
if len(errs) > 0 {
err = fmt.Errorf("failed to create internal static IPs: %s", strings.Join(errs, ", "))
return nil, err
}

ips, err = common.ListInstanceInternalStaticIps(ctx, project, region, instanceName)
if err != nil {
return
}
log.Info().Msgf("Created internal static IPs: %v", ips)
return
}
17 changes: 16 additions & 1 deletion cloud-functions/functions/terminate_cluster/terminate_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func DeleteStateObject(ctx context.Context, bucket, object string) (err error) {
return stateHandler.Delete(ctx)
}

func TerminateInstances(ctx context.Context, project, zone, labelKey, labelValue string) (terminatingInstances []string, errs []error) {
func TerminateInstances(ctx context.Context, project, zone, labelKey, labelValue string, releaseIps bool) (terminatingInstances []string, errs []error) {
instances, err := common.GetInstancesByLabel(ctx, project, zone, labelKey, labelValue)
if err != nil {
errs = append(errs, err)
Expand All @@ -39,5 +39,20 @@ func TerminateInstances(ctx context.Context, project, zone, labelKey, labelValue
}
terminatingInstances, errs2 := common.TerminateInstances(ctx, project, zone, instanceNames)
errs = append(errs, errs2...)

if releaseIps {
errs2 = releaseInstancesIps(ctx, project, zone, instanceNames)
errs = append(errs, errs2...)
}
return
}

func releaseInstancesIps(ctx context.Context, project, zone string, instances []string) (errs []error) {
for _, instance := range instances {
err := common.ReleaseInstanceInternalStaticIps(ctx, project, zone, instance)
if err != nil {
errs = append(errs, err)
}
}
return
}
2 changes: 1 addition & 1 deletion cloud-functions/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/googleapis/gax-go/v2 v2.12.5
github.com/lithammer/dedent v1.1.0
github.com/rs/zerolog v1.29.1
github.com/weka/go-cloud-lib v0.0.0-20240827073450-1db7d60030ab
github.com/weka/go-cloud-lib v0.0.0-20240903103548-6e9f9a867b8b
google.golang.org/api v0.186.0
google.golang.org/protobuf v1.34.2
)
Expand Down
4 changes: 2 additions & 2 deletions cloud-functions/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/weka/go-cloud-lib v0.0.0-20240827073450-1db7d60030ab h1:/DX4vsQ6XBerb6Z1uR4mnV+170Z8bvlsg9CrEnbW1YI=
github.com/weka/go-cloud-lib v0.0.0-20240827073450-1db7d60030ab/go.mod h1:FCQuk2bLvtDHe2Kjsu0oInJP1VOVsuxqPGHGMmVIPMg=
github.com/weka/go-cloud-lib v0.0.0-20240903103548-6e9f9a867b8b h1:3t+4QYDVA60EMiIYGUhruwmxt6n2Maabq3jrdQCJAjU=
github.com/weka/go-cloud-lib v0.0.0-20240903103548-6e9f9a867b8b/go.mod h1:FCQuk2bLvtDHe2Kjsu0oInJP1VOVsuxqPGHGMmVIPMg=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
Expand Down
9 changes: 5 additions & 4 deletions cloud-functions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func ScaleUp(w http.ResponseWriter, r *http.Request) {
usernameId := os.Getenv("USER_NAME_ID")
adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID")
deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID")
nfsSecondaryIpsNum, _ := strconv.Atoi(os.Getenv("NFS_SECONDARY_IPS_NUM"))

ctx := r.Context()
backends, err := common.GetInstancesByClusterLabel(ctx, project, zone, clusterName)
Expand Down Expand Up @@ -491,8 +492,8 @@ func ScaleUp(w http.ResponseWriter, r *http.Request) {
for i := nfsGatewaysNumber; i < nfsDesiredSize; i++ {
instanceName := fmt.Sprintf("%s-%s%03d", nfsGatewaysName, currentTime, i)
log.Info().Msgf("creating new NFS instance: %s", instanceName)
if err := scale_up.CreateNFSInstance(ctx, project, zone, nfsTemplateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl); err != nil {
err = fmt.Errorf("instance %s creation failed %s", instanceName, err)
if err := scale_up.CreateNFSInstance(ctx, project, zone, nfsTemplateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl, nfsSecondaryIpsNum); err != nil {
err = fmt.Errorf("instance %s creation failed %s.", instanceName, err)
log.Error().Err(err).Send()
respondWithErr(w, err, http.StatusBadRequest)
return
Expand Down Expand Up @@ -689,7 +690,7 @@ func TerminateCluster(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "Deleted NFS state successfully.")
}

terminatingNfsInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaProtocolGwLabelKey, nfsGatewaysName)
terminatingNfsInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaProtocolGwLabelKey, nfsGatewaysName, true)
if len(errs) > 0 {
fmt.Fprintf(w, "Got the following failure while terminating NFS instances: %s.", errs)
}
Expand All @@ -713,7 +714,7 @@ func TerminateCluster(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "Deleted cluster state successfully.")
}

terminatingInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaClusterLabelKey, d.Name)
terminatingInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaClusterLabelKey, d.Name, false)
if len(errs) > 0 {
fmt.Fprintf(w, "Got the following failure while terminating instances: %s.", errs)
}
Expand Down
Loading

0 comments on commit aa0f366

Please sign in to comment.