diff --git a/cloud-functions/common/common.go b/cloud-functions/common/common.go index 0d08f8ed..2d4e1ef3 100644 --- a/cloud-functions/common/common.go +++ b/cloud-functions/common/common.go @@ -162,6 +162,18 @@ func generateInstanceNamesFilter(instanceNames []string) (namesFilter string) { return } +func GetInstancesAliasIps(ctx context.Context, instances []*computepb.Instance) (aliasIps []string) { + for _, instance := range instances { + for _, networkInterface := range instance.NetworkInterfaces { + for _, aliasIp := range networkInterface.AliasIpRanges { + ip := strings.Split(*aliasIp.IpCidrRange, "/")[0] + aliasIps = append(aliasIps, ip) + } + } + } + return +} + func GetInstances(ctx context.Context, project, zone string, instanceNames []string) (instances []*computepb.Instance, err error) { if len(instanceNames) == 0 { log.Warn().Msg("Got empty instance names list") @@ -743,6 +755,88 @@ func CreateBucket(ctx context.Context, project, region, obsName string) (err err return } +func ListInstanceInternalStaticIps(ctx context.Context, project, region, instanceName string) (ips []string, err error) { + ipClient, err := compute.NewAddressesRESTClient(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to create ips client") + return + } + defer ipClient.Close() + + staticIps, err := listInstanceInternalStaticIps(ctx, ipClient, project, region, instanceName) + if err != nil { + return + } + + for _, ip := range staticIps { + ips = append(ips, *ip.Address) + } + return +} + +func listInstanceInternalStaticIps(ctx context.Context, client *compute.AddressesClient, project, region, instanceName string) (ips []*computepb.Address, err error) { + filter := fmt.Sprintf("name:%s-ip-*", instanceName) + + req := &computepb.ListAddressesRequest{ + Project: project, + Region: region, + Filter: &filter, + } + + it := client.List(ctx, req) + for { + ip, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("failed to list internal static IPs: %w", err) + } + ips = append(ips, ip) + } + return +} + +func ReleaseInstanceInternalStaticIps(ctx context.Context, project, zone, instanceName string) (err error) { + region := zone[:len(zone)-2] + + ipClient, err := compute.NewAddressesRESTClient(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to create ips client") + return + } + defer ipClient.Close() + + staticIps, err := listInstanceInternalStaticIps(ctx, ipClient, project, region, instanceName) + if err != nil { + return + } + + ipNames := make([]string, len(staticIps)) + for i, ip := range staticIps { + ipNames[i] = *ip.Name + } + + var errs []error + for _, ipName := range ipNames { + req := &computepb.DeleteAddressRequest{ + Project: project, + Region: region, + Address: ipName, + } + _, err := ipClient.Delete(ctx, req) + if err != nil { + errs = append(errs, err) + log.Error().Err(err).Msgf("Failed to delete internal static IP %s", ipName) + } + } + + if len(errs) > 0 { + err = fmt.Errorf("failed to release internal static IPs: %v", errs) + } + return +} + const FindDrivesScript = ` import json import sys diff --git a/cloud-functions/functions/clusterize/clusterize.go b/cloud-functions/functions/clusterize/clusterize.go index 0d973942..7142eb5b 100644 --- a/cloud-functions/functions/clusterize/clusterize.go +++ b/cloud-functions/functions/clusterize/clusterize.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "os" "strconv" "strings" @@ -90,10 +91,22 @@ func NFSClusterize(ctx context.Context, p ClusterizationParams) (clusterizeScrip nfsInterfaceGroupName := os.Getenv("NFS_INTERFACE_GROUP_NAME") nfsProtocolgwsNum, _ := strconv.Atoi(os.Getenv("NFS_PROTOCOL_GATEWAYS_NUM")) nfsSecondaryIpsNum, _ := strconv.Atoi(os.Getenv("NFS_SECONDARY_IPS_NUM")) + gateways := strings.Split(os.Getenv("GATEWAYS"), ",") + subnets := strings.Split(os.Getenv("SUBNETS"), ",") funcDef := gcp_functions_def.NewFuncDef(p.CloudFuncRootUrl) reportFunction := funcDef.GetFunctionCmdDefinition(functions_def.Report) + _, ipNet, err := net.ParseCIDR(subnets[0]) + if err != nil { + err = fmt.Errorf("failed to parse subnet cidr %s: %w", subnets[0], err) + clusterizeScript = cloudCommon.GetErrorScript(err, reportFunction, p.Vm.Protocol) + return + } + + gateway := gateways[0] + subnetMask := net.IP(ipNet.Mask).String() + state, err := common.AddInstanceToStateInstances(ctx, p.Bucket, p.NFSStateObject, p.Vm) if err != nil { var e *common.ExtraInstanceForClusterizationError @@ -129,8 +142,22 @@ func NFSClusterize(ctx context.Context, p ClusterizationParams) (clusterizeScrip nicNames = append(nicNames, instance.NicName) } - // TODO: add nfsSecondaryIpsNum check - secondaryIps := make([]string, 0, nfsSecondaryIpsNum) + instancesNames := common.GetInstancesNames(state.Instances) + nfsInstances, err := common.GetInstances(ctx, p.Project, p.Zone, instancesNames) + if err != nil { + err = fmt.Errorf("failed to get instances: %w", err) + log.Error().Err(err).Send() + clusterizeScript = cloudCommon.GetErrorScript(err, reportFunction, p.Vm.Protocol) + return + } + + secondaryIps := common.GetInstancesAliasIps(ctx, nfsInstances) + if len(secondaryIps) < nfsSecondaryIpsNum { + err = fmt.Errorf("not enough secondary ips found: %d/%d", len(secondaryIps), nfsSecondaryIpsNum) + log.Error().Err(err).Send() + clusterizeScript = cloudCommon.GetErrorScript(err, reportFunction, p.Vm.Protocol) + return + } nfsParams := protocol.NFSParams{ InterfaceGroupName: nfsInterfaceGroupName, @@ -138,6 +165,8 @@ func NFSClusterize(ctx context.Context, p ClusterizationParams) (clusterizeScrip ContainersUid: containersUid, NicNames: nicNames, HostsNum: nfsProtocolgwsNum, + Gateway: gateway, + SubnetMask: subnetMask, } scriptGenerator := clusterize.ConfigureNfsScriptGenerator{ diff --git a/cloud-functions/functions/scale_up/scale_up.go b/cloud-functions/functions/scale_up/scale_up.go index 335e010a..5745d6ea 100644 --- a/cloud-functions/functions/scale_up/scale_up.go +++ b/cloud-functions/functions/scale_up/scale_up.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" compute "cloud.google.com/go/compute/apiv1" "cloud.google.com/go/compute/apiv1/computepb" @@ -152,7 +153,7 @@ func CreateBackendInstance(ctx context.Context, project, zone, template, instanc return } -func CreateNFSInstance(ctx context.Context, project, zone, templateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl string) (err error) { +func CreateNFSInstance(ctx context.Context, project, zone, templateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl string, secondaryIpsNum int) (err error) { instancesClient, err := compute.NewInstancesRESTClient(ctx) if err != nil { log.Error().Err(err).Msg("Failed to create instances client") @@ -166,11 +167,30 @@ func CreateNFSInstance(ctx context.Context, project, zone, templateName, instanc return } + subnet := instanceTemplate.Properties.NetworkInterfaces[0].Subnetwork + region := zone[:len(zone)-2] + + ips, err := createInternalStaticIps(ctx, project, region, *subnet, instanceName, secondaryIpsNum) + if err != nil { + log.Error().Err(err).Msg("Failed to create secondary IPs") + return + } + + nics := instanceTemplate.Properties.NetworkInterfaces + // modify fisrt nic to use alias ips + nics[0].AliasIpRanges = make([]*computepb.AliasIpRange, len(ips)) + for i, ip := range ips { + nics[0].AliasIpRanges[i] = &computepb.AliasIpRange{ + IpCidrRange: &ip, + } + } + req := &computepb.InsertInstanceRequest{ Project: project, Zone: zone, InstanceResource: &computepb.Instance{ - Name: proto.String(instanceName), + Name: proto.String(instanceName), + NetworkInterfaces: nics, }, SourceInstanceTemplate: instanceTemplate.SelfLink, } @@ -325,3 +345,90 @@ func getProtocolGwInstancesFromInterfaceGroup(ctx context.Context, jpool *jrpc.P } return } + +func createInternalStaticIps(ctx context.Context, project, region, subnet, instanceName string, ipsNum int) (ips []string, err error) { + ipClient, err := compute.NewAddressesRESTClient(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to create ips client") + return + } + defer ipClient.Close() + + defer func() { + if err != nil { + for _, ip := range ips { + req := &computepb.DeleteAddressRequest{ + Project: project, + Region: region, + Address: ip, + } + _, err := ipClient.Delete(ctx, req) + if err != nil { + log.Error().Err(err).Msgf("Failed to delete internal static IP %s", ip) + } + } + } + }() + + var wg sync.WaitGroup + errCh := make(chan error, ipsNum) + defer close(errCh) + + for i := 0; i < ipsNum; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + addressName := fmt.Sprintf("%s-ip-%d", instanceName, i) + addressType := "INTERNAL" + purpose := "GCE_ENDPOINT" + address := &computepb.Address{ + Name: &addressName, + Subnetwork: &subnet, + AddressType: &addressType, + Region: ®ion, + Purpose: &purpose, + } + + req := &computepb.InsertAddressRequest{ + Project: project, + Region: region, + AddressResource: address, + } + + operation, err := ipClient.Insert(ctx, req) + if err != nil { + errCh <- fmt.Errorf("failed to create internal static IP %s: %w", addressName, err) + return + } + + err = operation.Wait(ctx) + if err != nil { + errCh <- fmt.Errorf("failed to wait for internal static IP creation %s: %w", addressName, err) + } + }(i) + } + + wg.Wait() + + // get all errors + var errs []string + for i := 0; i < ipsNum; i++ { + select { + case err := <-errCh: + errs = append(errs, err.Error()) + default: + } + } + if len(errs) > 0 { + err = fmt.Errorf("failed to create internal static IPs: %s", strings.Join(errs, ", ")) + return nil, err + } + + ips, err = common.ListInstanceInternalStaticIps(ctx, project, region, instanceName) + if err != nil { + return + } + log.Info().Msgf("Created internal static IPs: %v", ips) + return +} diff --git a/cloud-functions/functions/terminate_cluster/terminate_cluster.go b/cloud-functions/functions/terminate_cluster/terminate_cluster.go index 5e2c43e8..ee42508d 100644 --- a/cloud-functions/functions/terminate_cluster/terminate_cluster.go +++ b/cloud-functions/functions/terminate_cluster/terminate_cluster.go @@ -20,7 +20,7 @@ func DeleteStateObject(ctx context.Context, bucket, object string) (err error) { return stateHandler.Delete(ctx) } -func TerminateInstances(ctx context.Context, project, zone, labelKey, labelValue string) (terminatingInstances []string, errs []error) { +func TerminateInstances(ctx context.Context, project, zone, labelKey, labelValue string, releaseIps bool) (terminatingInstances []string, errs []error) { instances, err := common.GetInstancesByLabel(ctx, project, zone, labelKey, labelValue) if err != nil { errs = append(errs, err) @@ -39,5 +39,20 @@ func TerminateInstances(ctx context.Context, project, zone, labelKey, labelValue } terminatingInstances, errs2 := common.TerminateInstances(ctx, project, zone, instanceNames) errs = append(errs, errs2...) + + if releaseIps { + errs2 = releaseInstancesIps(ctx, project, zone, instanceNames) + errs = append(errs, errs2...) + } + return +} + +func releaseInstancesIps(ctx context.Context, project, zone string, instances []string) (errs []error) { + for _, instance := range instances { + err := common.ReleaseInstanceInternalStaticIps(ctx, project, zone, instance) + if err != nil { + errs = append(errs, err) + } + } return } diff --git a/cloud-functions/go.mod b/cloud-functions/go.mod index fa67c006..e10396bf 100644 --- a/cloud-functions/go.mod +++ b/cloud-functions/go.mod @@ -9,7 +9,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.5 github.com/lithammer/dedent v1.1.0 github.com/rs/zerolog v1.29.1 - github.com/weka/go-cloud-lib v0.0.0-20240827073450-1db7d60030ab + github.com/weka/go-cloud-lib v0.0.0-20240903103548-6e9f9a867b8b google.golang.org/api v0.186.0 google.golang.org/protobuf v1.34.2 ) diff --git a/cloud-functions/go.sum b/cloud-functions/go.sum index 122c23bd..6139a5cc 100644 --- a/cloud-functions/go.sum +++ b/cloud-functions/go.sum @@ -96,8 +96,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/weka/go-cloud-lib v0.0.0-20240827073450-1db7d60030ab h1:/DX4vsQ6XBerb6Z1uR4mnV+170Z8bvlsg9CrEnbW1YI= -github.com/weka/go-cloud-lib v0.0.0-20240827073450-1db7d60030ab/go.mod h1:FCQuk2bLvtDHe2Kjsu0oInJP1VOVsuxqPGHGMmVIPMg= +github.com/weka/go-cloud-lib v0.0.0-20240903103548-6e9f9a867b8b h1:3t+4QYDVA60EMiIYGUhruwmxt6n2Maabq3jrdQCJAjU= +github.com/weka/go-cloud-lib v0.0.0-20240903103548-6e9f9a867b8b/go.mod h1:FCQuk2bLvtDHe2Kjsu0oInJP1VOVsuxqPGHGMmVIPMg= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= diff --git a/cloud-functions/main.go b/cloud-functions/main.go index 76b8badb..ce2f769a 100644 --- a/cloud-functions/main.go +++ b/cloud-functions/main.go @@ -405,6 +405,7 @@ func ScaleUp(w http.ResponseWriter, r *http.Request) { usernameId := os.Getenv("USER_NAME_ID") adminPasswordId := os.Getenv("ADMIN_PASSWORD_ID") deploymentPasswordId := os.Getenv("DEPLOYMENT_PASSWORD_ID") + nfsSecondaryIpsNum, _ := strconv.Atoi(os.Getenv("NFS_SECONDARY_IPS_NUM")) ctx := r.Context() backends, err := common.GetInstancesByClusterLabel(ctx, project, zone, clusterName) @@ -491,8 +492,8 @@ func ScaleUp(w http.ResponseWriter, r *http.Request) { for i := nfsGatewaysNumber; i < nfsDesiredSize; i++ { instanceName := fmt.Sprintf("%s-%s%03d", nfsGatewaysName, currentTime, i) log.Info().Msgf("creating new NFS instance: %s", instanceName) - if err := scale_up.CreateNFSInstance(ctx, project, zone, nfsTemplateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl); err != nil { - err = fmt.Errorf("instance %s creation failed %s", instanceName, err) + if err := scale_up.CreateNFSInstance(ctx, project, zone, nfsTemplateName, instanceName, yumRepoServer, proxyUrl, functionRootUrl, nfsSecondaryIpsNum); err != nil { + err = fmt.Errorf("instance %s creation failed %s.", instanceName, err) log.Error().Err(err).Send() respondWithErr(w, err, http.StatusBadRequest) return @@ -689,7 +690,7 @@ func TerminateCluster(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "Deleted NFS state successfully.") } - terminatingNfsInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaProtocolGwLabelKey, nfsGatewaysName) + terminatingNfsInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaProtocolGwLabelKey, nfsGatewaysName, true) if len(errs) > 0 { fmt.Fprintf(w, "Got the following failure while terminating NFS instances: %s.", errs) } @@ -713,7 +714,7 @@ func TerminateCluster(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "Deleted cluster state successfully.") } - terminatingInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaClusterLabelKey, d.Name) + terminatingInstances, errs := terminate_cluster.TerminateInstances(ctx, project, zone, common.WekaClusterLabelKey, d.Name, false) if len(errs) > 0 { fmt.Fprintf(w, "Got the following failure while terminating instances: %s.", errs) } diff --git a/cloud_functions.tf b/cloud_functions.tf index caa63902..5334cd65 100644 --- a/cloud_functions.tf +++ b/cloud_functions.tf @@ -72,7 +72,7 @@ resource "google_cloudfunctions2_function" "cloud_internal_function" { INSTANCE_GROUP : google_compute_instance_group.this.name NFS_INSTANCE_GROUP : var.nfs_setup_protocol ? google_compute_instance_group.nfs[0].name : "" GATEWAYS : join(",", [for s in data.google_compute_subnetwork.this : s.gateway_address]) - SUBNETS : format("(%s)", join(" ", [for s in data.google_compute_subnetwork.this : s.ip_cidr_range])) + SUBNETS : join(",", [for s in data.google_compute_subnetwork.this : s.ip_cidr_range]) USER_NAME_ID : google_secret_manager_secret.secret_weka_username.id ADMIN_PASSWORD_ID = google_secret_manager_secret.secret_weka_password.id DEPLOYMENT_PASSWORD_ID : google_secret_manager_secret.weka_deployment_password.id diff --git a/modules/protocol_gateways/main.tf b/modules/protocol_gateways/main.tf index 994b1c16..0e860018 100644 --- a/modules/protocol_gateways/main.tf +++ b/modules/protocol_gateways/main.tf @@ -90,12 +90,6 @@ resource "google_compute_instance_template" "this" { subnetwork = data.google_compute_subnetwork.this[network_interface.value].id subnetwork_project = local.network_project_id access_config {} - dynamic "alias_ip_range" { - for_each = range(var.secondary_ips_per_nic) - content { - ip_cidr_range = "/32" - } - } } } # nic with private ip @@ -104,12 +98,6 @@ resource "google_compute_instance_template" "this" { content { subnetwork = data.google_compute_subnetwork.this[network_interface.value].id subnetwork_project = local.network_project_id - dynamic "alias_ip_range" { - for_each = range(var.secondary_ips_per_nic) - content { - ip_cidr_range = "/32" - } - } } } diff --git a/modules/service_account/main.tf b/modules/service_account/main.tf index d8458a46..da832846 100644 --- a/modules/service_account/main.tf +++ b/modules/service_account/main.tf @@ -29,7 +29,10 @@ resource "google_project_iam_member" "sa_member_role" { "roles/cloudfunctions.developer", "roles/workflows.invoker", "roles/vpcaccess.serviceAgent", - "roles/pubsub.subscriber" + "roles/pubsub.subscriber", + "roles/compute.networkAdmin", # needed for Internal Static IP creation + "roles/networkmanagement.admin", + "roles/compute.instanceAdmin.v1", # needed for 'compute.instances.updateNetworkInterface' ]) role = each.key member = "serviceAccount:${google_service_account.sa.email}" diff --git a/variables.tf b/variables.tf index 04a8fb92..95c69663 100644 --- a/variables.tf +++ b/variables.tf @@ -495,11 +495,6 @@ variable "nfs_protocol_gateway_secondary_ips_per_nic" { type = number description = "The number of secondary IPs per single NIC per NFS protocol gateway virtual machine." default = 0 - - validation { - condition = var.nfs_protocol_gateway_secondary_ips_per_nic == 0 - error_message = "Secondary (floating) IPs are currently not supported for GCP NFS protocol gateways." - } } variable "nfs_protocol_gateway_machine_type" {