From a46aec12794450c2efcab3558a3088abcf3c935f Mon Sep 17 00:00:00 2001 From: "xiayu.lyt" Date: Tue, 24 Oct 2023 14:47:15 +0800 Subject: [PATCH] node add ipv6 address & select only ipv4 address for node internal ip Signed-off-by: xiayu.lyt --- pkg/controller/helper/net_utils.go | 11 +++ pkg/controller/helper/node_utils.go | 120 ++++++++++++++++++++++--- pkg/controller/node/node_controller.go | 12 +-- pkg/provider/alibaba/ecs/ecs.go | 20 +++++ 4 files changed, 145 insertions(+), 18 deletions(-) create mode 100644 pkg/controller/helper/net_utils.go diff --git a/pkg/controller/helper/net_utils.go b/pkg/controller/helper/net_utils.go new file mode 100644 index 000000000..f013980f6 --- /dev/null +++ b/pkg/controller/helper/net_utils.go @@ -0,0 +1,11 @@ +package helper + +import "strings" + +func IsIPv4(address string) bool { + return strings.Count(address, ":") < 2 +} + +func IsIPv6(address string) bool { + return strings.Count(address, ":") >= 2 +} diff --git a/pkg/controller/helper/node_utils.go b/pkg/controller/helper/node_utils.go index 3c06b38d9..289158712 100644 --- a/pkg/controller/helper/node_utils.go +++ b/pkg/controller/helper/node_utils.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -89,6 +90,112 @@ func PatchM(mclient client.Client, target client.Object, getter func(runtime.Obj return nil } +func PatchNodeStatus(mclient client.Client, target *v1.Node, getter func(*v1.Node) (*v1.Node, error)) error { + err := mclient.Get( + context.TODO(), + client.ObjectKey{ + Name: target.GetName(), + }, target, + ) + if err != nil { + return fmt.Errorf("get origin object: %s", err.Error()) + } + + ntarget, err := getter(target.DeepCopy()) + if err != nil { + return fmt.Errorf("get object diff patch: %s", err.Error()) + } + + diffTarget := ntarget + manuallyPatchAddresses := (len(target.Status.Addresses) > 0) && + !equality.Semantic.DeepEqual(target.Status.Addresses, ntarget.Status.Addresses) + if manuallyPatchAddresses { + diffTarget = diffTarget.DeepCopy() + diffTarget.Status.Addresses = target.Status.Addresses + } + + oldData, err := json.Marshal(target) + if err != nil { + return fmt.Errorf("ensure marshal: %s", err.Error()) + } + newData, err := json.Marshal(diffTarget) + if err != nil { + return fmt.Errorf("ensure marshal: %s", err.Error()) + } + patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, target) + if patchErr != nil { + return fmt.Errorf("create merge patch: %s", patchErr.Error()) + } + + if string(patchBytes) == "{}" && !manuallyPatchAddresses { + return nil + } + + if manuallyPatchAddresses { + patchBytes, err = fixupPatchForNodeStatusAddresses(patchBytes, ntarget.Status.Addresses) + if err != nil { + return fmt.Errorf("fixup patch for node status addresses: %s", err.Error()) + } + } + + klog.Infof("try to patch node status %s/%s, %s ", target.GetNamespace(), target.GetName(), string(patchBytes)) + + return mclient.Status().Patch( + context.TODO(), ntarget, + client.RawPatch(types.StrategicMergePatchType, patchBytes)) +} + +// fixupPatchForNodeStatusAddresses adds a replace-strategy patch for Status.Addresses to +// the existing patch +func fixupPatchForNodeStatusAddresses(patchBytes []byte, addresses []v1.NodeAddress) ([]byte, error) { + // Given patchBytes='{"status": {"conditions": [ ... ], "phase": ...}}' and + // addresses=[{"type": "InternalIP", "address": "10.0.0.1"}], we need to generate: + // + // { + // "status": { + // "conditions": [ ... ], + // "phase": ..., + // "addresses": [ + // { + // "type": "InternalIP", + // "address": "10.0.0.1" + // }, + // { + // "$patch": "replace" + // } + // ] + // } + // } + + var patchMap map[string]interface{} + if err := json.Unmarshal(patchBytes, &patchMap); err != nil { + return nil, err + } + + addrBytes, err := json.Marshal(addresses) + if err != nil { + return nil, err + } + var addrArray []interface{} + if err := json.Unmarshal(addrBytes, &addrArray); err != nil { + return nil, err + } + addrArray = append(addrArray, map[string]interface{}{"$patch": "replace"}) + + status := patchMap["status"] + if status == nil { + status = map[string]interface{}{} + patchMap["status"] = status + } + statusMap, ok := status.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("unexpected data in patch") + } + statusMap["addresses"] = addrArray + + return json.Marshal(patchMap) +} + func FindCondition(conds []v1.NodeCondition, conditionType v1.NodeConditionType) (*v1.NodeCondition, bool) { var retCon *v1.NodeCondition for i := range conds { @@ -178,23 +285,12 @@ func IsNodeExcludeFromLoadBalancer(node *v1.Node) bool { return false } -func IsNodeExcludeFromEdgeLoadBalancer(node *v1.Node) bool { - if _, exclude := node.Labels[LabelNodeExcludeBalancer]; exclude { - return true - } - - if _, exclude := node.Labels[LabelNodeExcludeBalancerDeprecated]; exclude { - return true - } - return false -} - func GetNodeInternalIP(node *v1.Node) (string, error) { if len(node.Status.Addresses) == 0 { return "", fmt.Errorf("node %s do not contains addresses", node.Name) } for _, addr := range node.Status.Addresses { - if addr.Type == v1.NodeInternalIP { + if addr.Type == v1.NodeInternalIP && IsIPv4(addr.Address) { return addr.Address, nil } } diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index 93116b181..db4a7afa7 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -244,12 +244,12 @@ func (m *ReconcileNode) syncNode(nodes []corev1.Node) error { cloudNode.Addresses = []corev1.NodeAddress{*nodeIP} } - diff := func(copy runtime.Object) (client.Object, error) { - nins := copy.(*corev1.Node) - nins.Status.Addresses = cloudNode.Addresses - return nins, nil + statusDiff := func(copy *corev1.Node) (*corev1.Node, error) { + copy.Status.Addresses = cloudNode.Addresses + return copy, nil } - err := helper.PatchM(m.client, node, diff, helper.PatchStatus) + + err := helper.PatchNodeStatus(m.client, node, statusDiff) if err != nil { log.Error(err, "patch node address error, wait for next retry", "node", node.Name) m.record.Event( @@ -257,7 +257,7 @@ func (m *ReconcileNode) syncNode(nodes []corev1.Node) error { ) } - diff = func(copy runtime.Object) (client.Object, error) { + diff := func(copy runtime.Object) (client.Object, error) { nins := copy.(*corev1.Node) setFields(nins, cloudNode, false) return nins, nil diff --git a/pkg/provider/alibaba/ecs/ecs.go b/pkg/provider/alibaba/ecs/ecs.go index 1f5233fc6..e5a12058c 100644 --- a/pkg/provider/alibaba/ecs/ecs.go +++ b/pkg/provider/alibaba/ecs/ecs.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "strings" @@ -120,6 +121,7 @@ func (e *ECSProvider) getInstances(ids []string, region string) ([]ecs.Instance, req.InstanceIds = string(bids) req.NextToken = "" req.MaxResults = requests.NewInteger(50) + req.AdditionalAttributes = &[]string{"NETWORK_PRIMARY_ENI_IP"} var ecsInstances []ecs.Instance for { @@ -227,6 +229,24 @@ func findAddress(instance *ecs.Instance) []v1.NodeAddress { } } + if utilfeature.DefaultFeatureGate.Enabled(ctrlCfg.IPv6DualStack) { + if len(instance.NetworkInterfaces.NetworkInterface) > 0 { + var primary *ecs.NetworkInterface + for i := range instance.NetworkInterfaces.NetworkInterface { + if instance.NetworkInterfaces.NetworkInterface[i].Type == "Primary" { + primary = &instance.NetworkInterfaces.NetworkInterface[i] + break + } + } + if primary != nil { + // add all ipv6 address of primary network interface to node address + for _, addr := range primary.Ipv6Sets.Ipv6Set { + addrs = append(addrs, v1.NodeAddress{Type: v1.NodeInternalIP, Address: addr.Ipv6Address}) + } + } + } + } + return addrs }