Skip to content

Commit

Permalink
fix: 优化gke扩容时节点申请流程
Browse files Browse the repository at this point in the history
  • Loading branch information
Maclon9573 committed Jul 26, 2024
1 parent 10c181d commit d5d29e4
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,14 @@ var (
SuccessNodeIDsKey ParamKey = "successNodeIDs"
// FailedNodeIDsKey xxx
FailedNodeIDsKey ParamKey = "failedNodeIDs"
// FailureNodeIDsKey xxx
FailureNodeIDsKey ParamKey = "failureNodeIDs"
// TimeoutNodeIDsKey xxx
TimeoutNodeIDsKey ParamKey = "timeoutNodeIDs"

// FailureReason xxx
FailureReason ParamKey = "failureReason"

// SuccessAddClusterNodeIDsKey xxx
// cloud cluster success & failed Instance
SuccessAddClusterNodeIDsKey ParamKey = "successAddClusterNodeIDs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,26 @@ func recordClusterInstanceToDB(ctx context.Context, state *cloudprovider.TaskSta
state.Task.CommonParams = make(map[string]string)
}

if len(instancesNames) > 0 {
state.Task.CommonParams[cloudprovider.SuccessNodeIDsKey.String()] = strings.Join(instancesNames, ",")
state.Task.CommonParams[cloudprovider.NodeNamesKey.String()] = strings.Join(instancesNames, ",")
state.Task.CommonParams[cloudprovider.NodeIDsKey.String()] = strings.Join(instancesNames, ",")
successIns, failureIns, err := checkInstance(client, instancesNames)
if err != nil {
_ = returnGkeInstancesAndCleanNodes(ctx, info, instancesNames)
blog.Errorf("recordClusterInstanceToDB[%s] checkInstance failed, %v, successInstances[%+v],"+
" failureInstances[%+v]", taskID, err, successIns, failureIns)
state.Task.CommonParams[cloudprovider.SuccessNodeIDsKey.String()] = strings.Join(successIns, ",")
state.Task.CommonParams[cloudprovider.FailureNodeIDsKey.String()] = strings.Join(failureIns, ",")
state.Task.CommonParams[cloudprovider.FailureReason.String()] = err.Error()
return fmt.Errorf("checkInstance failed, %v, successInstances[%+v], failureInstances[%+v]",
err, successIns, failureIns)
}

if len(successIns) > 0 {
state.Task.CommonParams[cloudprovider.SuccessNodeIDsKey.String()] = strings.Join(successIns, ",")
state.Task.CommonParams[cloudprovider.NodeNamesKey.String()] = strings.Join(successIns, ",")
state.Task.CommonParams[cloudprovider.NodeIDsKey.String()] = strings.Join(successIns, ",")
}

// record successNodes to cluster manager DB
nodeIPs, err := transInstancesToNode(ctx, instancesNames, info)
nodeIPs, err := transInstancesToNode(ctx, successIns, info)
if err != nil {
blog.Errorf("recordClusterInstanceToDB[%s] failed: %v", taskID, err)
}
Expand All @@ -271,14 +283,16 @@ func recordClusterInstanceToDB(ctx context.Context, state *cloudprovider.TaskSta
return nil
}

func checkInstance(client *api.ComputeServiceClient, ids []string) error {
func checkInstance(client *api.ComputeServiceClient, ids []string) ([]string, []string, error) {
successIns, failureIns := make([]string, 0), make([]string, 0)
timeCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()
err := loop.LoopDoFunc(timeCtx, func() error {
running, failed := make([]string, 0), make([]string, 0)
insList, err := client.ListZoneInstanceWithFilter(context.Background(), api.InstanceNameFilter(ids))
if err != nil {
blog.Errorf("checkInstance ListZoneInstanceWithFilter failed, %s", err.Error())
return err
return nil
}

// check response data
Expand All @@ -291,18 +305,56 @@ func checkInstance(client *api.ComputeServiceClient, ids []string) error {

for _, in := range insList.Items {
if len(in.NetworkInterfaces[0].NetworkIP) == 0 {
failed = append(failed, in.Name)
blog.Warnf("checkInstance[%s] IP is still not distributed", in.Name)
return nil
continue
}
running = append(running, in.Name)
}

return loop.EndLoop
successIns = running
failureIns = failed

if len(failed) == 0 {
return loop.EndLoop
}

return nil
})
if err != nil {
return err
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return successIns, failureIns, err
}
if errors.Is(err, context.DeadlineExceeded) {
running, failed := make([]string, 0), make([]string, 0)
insList, err := client.ListZoneInstanceWithFilter(context.Background(), api.InstanceNameFilter(ids))
if err != nil {
blog.Errorf("checkInstance ListZoneInstanceWithFilter failed, %s", err.Error())
return nil, nil, err
}

return nil
// check response data
if len(insList.Items) != len(ids) {
blog.Warnf("checkInstance desired %d, but got %d, instances[%+v]", len(ids), len(insList.Items),
insList.Items)
return nil, nil, fmt.Errorf("checkInstance desired %d, but got %d", len(ids), len(insList.Items))
}

blog.Infof("checkInstance desired %d, response %d", len(ids), len(insList.Items))

for _, in := range insList.Items {
if len(in.NetworkInterfaces[0].NetworkIP) == 0 {
failed = append(failed, in.Name)
blog.Warnf("checkInstance[%s] IP is still not distributed", in.Name)
continue
}
running = append(running, in.Name)
}

successIns = running
failureIns = failed
}

return successIns, failureIns, nil
}

// transInstancesToNode record success nodes to cm DB
Expand All @@ -314,17 +366,6 @@ func transInstancesToNode(ctx context.Context, instanceNames []string, info *clo
nodeIPs = make([]string, 0)
err error
)
client, err := api.NewComputeServiceClient(info.CmOption)
if err != nil {
blog.Errorf("transInstanceIDsToNodes create ComputeServiceClient failed, %s", err.Error())
return nil, err
}

err = checkInstance(client, instanceNames)
if err != nil {
blog.Errorf("transInstanceIDsToNodes checkInstance failed, %s", err.Error())
return nil, err
}

taskID := cloudprovider.GetTaskIDFromContext(ctx)
err = retry.Do(func() error {
Expand Down Expand Up @@ -451,7 +492,11 @@ func checkClusterInstanceStatus(ctx context.Context, info *cloudprovider.CloudDe

// set cluster node status
for _, n := range addFailureNodes {
err = cloudprovider.UpdateNodeStatusByInstanceID(n, common.StatusAddNodesFailed)
node, err := cloudprovider.GetStorageModel().GetNodeByName(ctx, info.Cluster.ClusterID, n)
if err != nil {
blog.Errorf("checkClusterInstanceStatus[%s] GetNodeByName[%s] failed: %v", taskID, n, err)
}
err = cloudprovider.UpdateNodeStatusByInstanceID(node.NodeID, common.StatusAddNodesFailed)
if err != nil {
blog.Errorf("checkClusterInstanceStatus[%s] UpdateNodeStatusByInstanceID[%s] failed: %v", taskID, n, err)
}
Expand Down

0 comments on commit d5d29e4

Please sign in to comment.