Skip to content

Commit

Permalink
node add ipv6 address & select only ipv4 address for node internal ip
Browse files Browse the repository at this point in the history
Signed-off-by: xiayu.lyt <lyt1999131@163.com>
  • Loading branch information
Lyt99 committed Mar 19, 2024
1 parent d25234d commit a46aec1
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 18 deletions.
11 changes: 11 additions & 0 deletions pkg/controller/helper/net_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package helper

import "strings"

func IsIPv4(address string) bool {
return strings.Count(address, ":") < 2
}

func IsIPv6(address string) bool {
return strings.Count(address, ":") >= 2
}
120 changes: 108 additions & 12 deletions pkg/controller/helper/node_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand Down Expand Up @@ -89,6 +90,112 @@ func PatchM(mclient client.Client, target client.Object, getter func(runtime.Obj
return nil
}

func PatchNodeStatus(mclient client.Client, target *v1.Node, getter func(*v1.Node) (*v1.Node, error)) error {
err := mclient.Get(
context.TODO(),
client.ObjectKey{
Name: target.GetName(),
}, target,
)
if err != nil {
return fmt.Errorf("get origin object: %s", err.Error())
}

ntarget, err := getter(target.DeepCopy())
if err != nil {
return fmt.Errorf("get object diff patch: %s", err.Error())
}

diffTarget := ntarget
manuallyPatchAddresses := (len(target.Status.Addresses) > 0) &&
!equality.Semantic.DeepEqual(target.Status.Addresses, ntarget.Status.Addresses)
if manuallyPatchAddresses {
diffTarget = diffTarget.DeepCopy()
diffTarget.Status.Addresses = target.Status.Addresses
}

oldData, err := json.Marshal(target)
if err != nil {
return fmt.Errorf("ensure marshal: %s", err.Error())
}
newData, err := json.Marshal(diffTarget)
if err != nil {
return fmt.Errorf("ensure marshal: %s", err.Error())
}
patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, target)
if patchErr != nil {
return fmt.Errorf("create merge patch: %s", patchErr.Error())
}

if string(patchBytes) == "{}" && !manuallyPatchAddresses {
return nil
}

if manuallyPatchAddresses {
patchBytes, err = fixupPatchForNodeStatusAddresses(patchBytes, ntarget.Status.Addresses)
if err != nil {
return fmt.Errorf("fixup patch for node status addresses: %s", err.Error())
}
}

klog.Infof("try to patch node status %s/%s, %s ", target.GetNamespace(), target.GetName(), string(patchBytes))

return mclient.Status().Patch(
context.TODO(), ntarget,
client.RawPatch(types.StrategicMergePatchType, patchBytes))
}

// fixupPatchForNodeStatusAddresses adds a replace-strategy patch for Status.Addresses to
// the existing patch
func fixupPatchForNodeStatusAddresses(patchBytes []byte, addresses []v1.NodeAddress) ([]byte, error) {
// Given patchBytes='{"status": {"conditions": [ ... ], "phase": ...}}' and
// addresses=[{"type": "InternalIP", "address": "10.0.0.1"}], we need to generate:
//
// {
// "status": {
// "conditions": [ ... ],
// "phase": ...,
// "addresses": [
// {
// "type": "InternalIP",
// "address": "10.0.0.1"
// },
// {
// "$patch": "replace"
// }
// ]
// }
// }

var patchMap map[string]interface{}
if err := json.Unmarshal(patchBytes, &patchMap); err != nil {
return nil, err
}

addrBytes, err := json.Marshal(addresses)
if err != nil {
return nil, err
}
var addrArray []interface{}
if err := json.Unmarshal(addrBytes, &addrArray); err != nil {
return nil, err
}
addrArray = append(addrArray, map[string]interface{}{"$patch": "replace"})

status := patchMap["status"]
if status == nil {
status = map[string]interface{}{}
patchMap["status"] = status
}
statusMap, ok := status.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("unexpected data in patch")
}
statusMap["addresses"] = addrArray

return json.Marshal(patchMap)
}

func FindCondition(conds []v1.NodeCondition, conditionType v1.NodeConditionType) (*v1.NodeCondition, bool) {
var retCon *v1.NodeCondition
for i := range conds {
Expand Down Expand Up @@ -178,23 +285,12 @@ func IsNodeExcludeFromLoadBalancer(node *v1.Node) bool {
return false
}

func IsNodeExcludeFromEdgeLoadBalancer(node *v1.Node) bool {
if _, exclude := node.Labels[LabelNodeExcludeBalancer]; exclude {
return true
}

if _, exclude := node.Labels[LabelNodeExcludeBalancerDeprecated]; exclude {
return true
}
return false
}

func GetNodeInternalIP(node *v1.Node) (string, error) {
if len(node.Status.Addresses) == 0 {
return "", fmt.Errorf("node %s do not contains addresses", node.Name)
}
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP {
if addr.Type == v1.NodeInternalIP && IsIPv4(addr.Address) {
return addr.Address, nil
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,20 +244,20 @@ func (m *ReconcileNode) syncNode(nodes []corev1.Node) error {
cloudNode.Addresses = []corev1.NodeAddress{*nodeIP}
}

diff := func(copy runtime.Object) (client.Object, error) {
nins := copy.(*corev1.Node)
nins.Status.Addresses = cloudNode.Addresses
return nins, nil
statusDiff := func(copy *corev1.Node) (*corev1.Node, error) {
copy.Status.Addresses = cloudNode.Addresses
return copy, nil
}
err := helper.PatchM(m.client, node, diff, helper.PatchStatus)

err := helper.PatchNodeStatus(m.client, node, statusDiff)
if err != nil {
log.Error(err, "patch node address error, wait for next retry", "node", node.Name)
m.record.Event(
nodeRef, corev1.EventTypeWarning, helper.FailedSyncNode, err.Error(),
)
}

diff = func(copy runtime.Object) (client.Object, error) {
diff := func(copy runtime.Object) (client.Object, error) {
nins := copy.(*corev1.Node)
setFields(nins, cloudNode, false)
return nins, nil
Expand Down
20 changes: 20 additions & 0 deletions pkg/provider/alibaba/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"strings"

Expand Down Expand Up @@ -120,6 +121,7 @@ func (e *ECSProvider) getInstances(ids []string, region string) ([]ecs.Instance,
req.InstanceIds = string(bids)
req.NextToken = ""
req.MaxResults = requests.NewInteger(50)
req.AdditionalAttributes = &[]string{"NETWORK_PRIMARY_ENI_IP"}

var ecsInstances []ecs.Instance
for {
Expand Down Expand Up @@ -227,6 +229,24 @@ func findAddress(instance *ecs.Instance) []v1.NodeAddress {
}
}

if utilfeature.DefaultFeatureGate.Enabled(ctrlCfg.IPv6DualStack) {
if len(instance.NetworkInterfaces.NetworkInterface) > 0 {
var primary *ecs.NetworkInterface
for i := range instance.NetworkInterfaces.NetworkInterface {
if instance.NetworkInterfaces.NetworkInterface[i].Type == "Primary" {
primary = &instance.NetworkInterfaces.NetworkInterface[i]
break
}
}
if primary != nil {
// add all ipv6 address of primary network interface to node address
for _, addr := range primary.Ipv6Sets.Ipv6Set {
addrs = append(addrs, v1.NodeAddress{Type: v1.NodeInternalIP, Address: addr.Ipv6Address})
}
}
}
}

return addrs
}

Expand Down

0 comments on commit a46aec1

Please sign in to comment.