From e31685d71bc8b36d34527802d8b9832002be514f Mon Sep 17 00:00:00 2001 From: shane Date: Thu, 4 May 2023 12:24:38 +0000 Subject: [PATCH] feat: natgw qos --- charts/templates/ovn-CR.yaml | 1 + dist/images/install.sh | 311 ++++++++++++++++++ dist/images/vpcnatgateway/nat-gateway.sh | 141 ++++++-- pkg/apis/kubeovn/v1/status.go | 10 + pkg/apis/kubeovn/v1/types.go | 15 +- pkg/apis/kubeovn/v1/zz_generated.deepcopy.go | 35 ++ .../kubeovn/v1/fake/fake_vpcnatgateway.go | 11 + .../typed/kubeovn/v1/vpcnatgateway.go | 16 + pkg/controller/init.go | 2 +- pkg/controller/qos_policy.go | 76 ++++- pkg/controller/vpc_nat_gateway.go | 258 ++++++++++++++- yamls/crd.yaml | 309 +++++++++++++++++ yamls/ovn-dpdk.yaml | 1 + yamls/ovn-ha.yaml | 1 + 14 files changed, 1138 insertions(+), 49 deletions(-) diff --git a/charts/templates/ovn-CR.yaml b/charts/templates/ovn-CR.yaml index 5c773030707..1fd647b584b 100644 --- a/charts/templates/ovn-CR.yaml +++ b/charts/templates/ovn-CR.yaml @@ -11,6 +11,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips diff --git a/dist/images/install.sh b/dist/images/install.sh index b0990b83341..99dfd0bf9a0 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -385,10 +385,317 @@ spec: name: v1 served: true storage: true + subresources: + status: {} schema: openAPIV3Schema: type: object properties: + status: + type: object + properties: + externalSubnets: + items: + type: string + type: array + selector: + type: array + items: + type: string + qosPolicy: + type: string + tolerations: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + enum: + - Equal + - Exists + value: + type: string + effect: + type: string + enum: + - NoExecute + - NoSchedule + - PreferNoSchedule + tolerationSeconds: + type: integer + affinity: + properties: + nodeAffinity: + properties: + preferredDuringSchedulingIgnoredDuringExecution: + items: + properties: + preference: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + properties: + nodeSelectorTerms: + items: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object + podAffinity: + properties: + preferredDuringSchedulingIgnoredDuringExecution: + items: + properties: + podAffinityTerm: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + weight: + format: int32 + type: integer + required: + - podAffinityTerm + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + items: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + type: array + type: object + podAntiAffinity: + properties: + preferredDuringSchedulingIgnoredDuringExecution: + items: + properties: + podAffinityTerm: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + weight: + format: int32 + type: integer + required: + - podAffinityTerm + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + items: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + type: array + type: object + type: object spec: type: object properties: @@ -406,6 +713,8 @@ spec: type: array items: type: string + qosPolicy: + type: string tolerations: type: array items: @@ -2322,6 +2631,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips @@ -2835,6 +3145,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips diff --git a/dist/images/vpcnatgateway/nat-gateway.sh b/dist/images/vpcnatgateway/nat-gateway.sh index abdcb7aa7bd..3b44f238686 100644 --- a/dist/images/vpcnatgateway/nat-gateway.sh +++ b/dist/images/vpcnatgateway/nat-gateway.sh @@ -242,11 +242,13 @@ function del_dnat() { # example usage: -# delete_tc_filter "1:0" "192.168.1.1" "src" -function delete_tc_filter() { - qdisc_id=$1 - v4ip=$2 - direction=$3 +# delete_tc_u32_filter "net1" "1:0" "192.168.1.1" "src" +function delete_tc_u32_filter() { + dev=$1 + qdisc_id=$2 + cidr=$3 + matchDirection=$4 + # tc -p -s -d filter show dev net1 parent $qdisc_id # filter protocol ip pref 10 u32 chain 0 @@ -259,16 +261,16 @@ function delete_tc_filter() { # Sent 0 bytes 0 pkts (dropped 0, overlimits 0) # get the corresponding filterID by the EIP, and use the filterID to delete the corresponding filtering rule. - ipList=$(tc -p -s -d filter show dev net1 parent $qdisc_id | grep "match IP " | awk '{print $4}') + ipList=$(tc -p -s -d filter show dev $dev parent $qdisc_id | grep -E "match IP src|dst ([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}") i=0 - for line in $ipList; do + echo "$ipList" | while read line; do i=$((i+1)) - if echo "$line" | grep $v4ip; then - result=$(tc -p -s -d filter show dev net1 parent $qdisc_id | grep "filter protocol ip pref [0-9]\+ u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $5,$10}' | sed -n $i"p") + if echo "$line" | grep "$matchDirection $cidr"; then + result=$(tc -p -s -d filter show dev $dev parent $qdisc_id | grep "filter protocol ip pref [0-9]\+ u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $5,$10}' | sed -n $i"p") arr=($result) pref=${arr[0]} filterID=${arr[1]} - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio $pref handle $filterID u32" + exec_cmd "tc filter del dev $dev parent $qdisc_id protocol ip prio $pref handle $filterID u32" break fi done @@ -285,16 +287,17 @@ function eip_ingress_qos_add() { priority=${arr[1]} rate=${arr[2]} burst=${arr[3]} - direction="dst" - tc qdisc add dev net1 ingress 2>/dev/nul || true + dev="net1" + matchDirection="dst" + tc qdisc add dev $dev ingress 2>/dev/nul || true # get qdisc id - qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') # del old filter - tc -p -s -d filter show dev net1 parent $qdisc_id | grep -w $v4ip + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w $v4ip if [ "$?" -eq 0 ];then - delete_tc_filter $qdisc_id $v4ip $direction + delete_tc_u32_filter $dev $qdisc_id $v4ip $matchDirection fi - exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio $priority u32 match ip $direction $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority u32 match ip $matchDirection $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" done } @@ -310,14 +313,86 @@ function eip_egress_qos_add() { rate=${arr[2]} burst=${arr[3]} qdisc_id="1:0" - direction="src" - tc qdisc add dev net1 root handle $qdisc_id htb 2>/dev/nul || true + matchDirection="src" + dev="net1" + tc qdisc add dev $dev root handle $qdisc_id htb 2>/dev/nul || true # del old filter - tc -p -s -d filter show dev net1 parent $qdisc_id | grep -w $v4ip + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w $v4ip if [ "$?" -eq 0 ];then - delete_tc_filter $qdisc_id $v4ip $direction + delete_tc_u32_filter $dev $qdisc_id $v4ip $matchDirection + fi + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority u32 match ip $matchDirection $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + done +} + +function qos_add() { + for rule in $@ + do + IFS=',' read -r -a arr <<< "$rule" + local qdiscType=(${arr[0]}) + local dev=${arr[1]} + local priority=${arr[2]} + local classifierType=${arr[3]} + local matchType=${arr[4]} + local matchDirection=${arr[5]} + local cidr=${arr[6]} + local rate=${arr[7]} + local burst=${arr[8]} + + if [ "$qdiscType" == "ingress" ];then + tc qdisc add dev $dev ingress 2>/dev/null || true + # get qdisc id + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') + elif [ "$qdiscType" == "egress" ];then + qdisc_id="1:0" + tc qdisc add dev $dev root handle $qdisc_id htb 2>/dev/null || true + fi + + if [ "$classifierType" == "u32" ];then + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w $cidr + if [ "$?" -ne 0 ];then + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority u32 match $matchType $matchDirection $cidr police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + fi + elif [ "$classifierType" == "matchall" ];then + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w matchall + if [ "$?" -ne 0 ];then + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority matchall action police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + fi + fi + done +} + +function qos_del() { + for rule in $@ + do + IFS=',' read -r -a arr <<< "$rule" + local qdiscType=(${arr[0]}) + local dev=${arr[1]} + local priority=${arr[2]} + local classifierType=${arr[3]} + local matchType=${arr[4]} + local matchDirection=${arr[5]} + local cidr=${arr[6]} + local rate=${arr[7]} + local burst=${arr[8]} + + if [ "$qdiscType" == "ingress" ];then + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') + if [ -z "$qdisc_id" ]; then + exit 0 + fi + elif [ "$qdiscType" == "egress" ];then + qdisc_id="1:0" + fi + # if qdisc_id is empty, this means ingress qdisc is not added, so we don't need to delete filter. + if [ "$classifierType" == "u32" ];then + delete_tc_u32_filter $dev $qdisc_id $cidr $matchDirection + elif [ "$classifierType" == "matchall" ];then + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w matchall + if [ "$?" -eq 0 ];then + exec_cmd "tc filter del dev $dev parent $qdisc_id protocol ip prio $priority matchall" + fi fi - exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio $priority u32 match ip $direction $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" done } @@ -325,12 +400,13 @@ function eip_ingress_qos_del() { for rule in $@ do arr=(${rule//,/ }) - v4ip=(${arr[0]//\// }) - direction="dst" - qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') + cidr=(${arr[0]//\// }) + matchDirection="dst" + dev="net1" + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') # if qdisc_id is empty, this means ingress qdisc is not added, so we don't need to delete filter. if [ -n "$qdisc_id" ]; then - delete_tc_filter $qdisc_id $v4ip $direction + delete_tc_u32_filter $dev $qdisc_id $cidr $matchDirection fi done } @@ -339,10 +415,11 @@ function eip_egress_qos_del() { for rule in $@ do arr=(${rule//,/ }) - v4ip=(${arr[0]//\// }) - direction="src" + cidr=(${arr[0]//\// }) + matchDirection="src" qdisc_id="1:0" - delete_tc_filter $qdisc_id $v4ip $direction + dev="net1" + delete_tc_u32_filter $dev $qdisc_id $cidr $matchDirection done } @@ -422,6 +499,14 @@ case $opt in echo "eip-egress-qos-del $rules" eip_egress_qos_del $rules ;; + qos-add) + echo "qos-add $rules" + qos_add $rules + ;; + qos-del) + echo "qos-del $rules" + qos_del $rules + ;; *) echo "Usage: $0 [init|subnet-route-add|subnet-route-del|eip-add|eip-del|floating-ip-add|floating-ip-del|dnat-add|dnat-del|snat-add|snat-del] ..." exit 1 diff --git a/pkg/apis/kubeovn/v1/status.go b/pkg/apis/kubeovn/v1/status.go index cae4d2968ab..28f87ff69bf 100644 --- a/pkg/apis/kubeovn/v1/status.go +++ b/pkg/apis/kubeovn/v1/status.go @@ -137,3 +137,13 @@ func (qoss *QoSPolicyStatus) Bytes() ([]byte, error) { klog.V(5).Info("status body", newStr) return []byte(newStr), nil } + +func (vns *VpcNatStatus) Bytes() ([]byte, error) { + bytes, err := json.Marshal(vns) + if err != nil { + return nil, err + } + newStr := fmt.Sprintf(`{"status": %s}`, string(bytes)) + klog.V(5).Info("status body", newStr) + return []byte(newStr), nil +} diff --git a/pkg/apis/kubeovn/v1/types.go b/pkg/apis/kubeovn/v1/types.go index cd57b7b1318..f6aa92b3dc2 100644 --- a/pkg/apis/kubeovn/v1/types.go +++ b/pkg/apis/kubeovn/v1/types.go @@ -44,7 +44,8 @@ var ( type QoSPolicyBindingType string const ( - QoSBindingTypeEIP QoSPolicyBindingType = "EIP" + QoSBindingTypeEIP QoSPolicyBindingType = "EIP" + QoSBindingTypeNatGw QoSPolicyBindingType = "NATGW" ) type QoSPolicyRuleDirection string @@ -479,7 +480,8 @@ type VpcNatGateway struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec VpcNatSpec `json:"spec"` + Spec VpcNatSpec `json:"spec"` + Status VpcNatStatus `json:"status,omitempty"` } type VpcNatSpec struct { @@ -490,6 +492,15 @@ type VpcNatSpec struct { Selector []string `json:"selector"` Tolerations []corev1.Toleration `json:"tolerations"` Affinity corev1.Affinity `json:"affinity"` + QoSPolicy string `json:"qosPolicy"` +} + +type VpcNatStatus struct { + QoSPolicy string `json:"qosPolicy" patchStrategy:"merge"` + ExternalSubnets []string `json:"externalSubnets" patchStrategy:"merge"` + Selector []string `json:"selector" patchStrategy:"merge"` + Tolerations []corev1.Toleration `json:"tolerations" patchStrategy:"merge"` + Affinity corev1.Affinity `json:"affinity" patchStrategy:"merge"` } // +genclient diff --git a/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go b/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go index cd2a8a1781f..571bf15971c 100644 --- a/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go @@ -2329,6 +2329,7 @@ func (in *VpcNatGateway) DeepCopyInto(out *VpcNatGateway) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) return } @@ -2417,6 +2418,40 @@ func (in *VpcNatSpec) DeepCopy() *VpcNatSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VpcNatStatus) DeepCopyInto(out *VpcNatStatus) { + *out = *in + if in.ExternalSubnets != nil { + in, out := &in.ExternalSubnets, &out.ExternalSubnets + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Selector != nil { + in, out := &in.Selector, &out.Selector + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]corev1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + in.Affinity.DeepCopyInto(&out.Affinity) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VpcNatStatus. +func (in *VpcNatStatus) DeepCopy() *VpcNatStatus { + if in == nil { + return nil + } + out := new(VpcNatStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VpcPeering) DeepCopyInto(out *VpcPeering) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go index 2125b85d99e..0c379f43660 100644 --- a/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go @@ -96,6 +96,17 @@ func (c *FakeVpcNatGateways) Update(ctx context.Context, vpcNatGateway *kubeovnv return obj.(*kubeovnv1.VpcNatGateway), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeVpcNatGateways) UpdateStatus(ctx context.Context, vpcNatGateway *kubeovnv1.VpcNatGateway, opts v1.UpdateOptions) (*kubeovnv1.VpcNatGateway, error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(vpcnatgatewaysResource, "status", vpcNatGateway), &kubeovnv1.VpcNatGateway{}) + if obj == nil { + return nil, err + } + return obj.(*kubeovnv1.VpcNatGateway), err +} + // Delete takes name of the vpcNatGateway and deletes it. Returns an error if one occurs. func (c *FakeVpcNatGateways) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { _, err := c.Fake. diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go index 03d08658f38..074523ed533 100644 --- a/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go @@ -40,6 +40,7 @@ type VpcNatGatewaysGetter interface { type VpcNatGatewayInterface interface { Create(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.CreateOptions) (*v1.VpcNatGateway, error) Update(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.UpdateOptions) (*v1.VpcNatGateway, error) + UpdateStatus(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.UpdateOptions) (*v1.VpcNatGateway, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.VpcNatGateway, error) @@ -128,6 +129,21 @@ func (c *vpcNatGateways) Update(ctx context.Context, vpcNatGateway *v1.VpcNatGat return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *vpcNatGateways) UpdateStatus(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.UpdateOptions) (result *v1.VpcNatGateway, err error) { + result = &v1.VpcNatGateway{} + err = c.client.Put(). + Resource("vpc-nat-gateways"). + Name(vpcNatGateway.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(vpcNatGateway). + Do(ctx). + Into(result) + return +} + // Delete takes name of the vpcNatGateway and deletes it. Returns an error if one occurs. func (c *vpcNatGateways) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { return c.client.Delete(). diff --git a/pkg/controller/init.go b/pkg/controller/init.go index 48f864470de..9fdbe2dd460 100644 --- a/pkg/controller/init.go +++ b/pkg/controller/init.go @@ -707,7 +707,7 @@ func (c *Controller) initSyncCrdVpcNatGw() error { return err } for _, gw := range gws { - if err := c.updateCrdNatGw(gw.Name); err != nil { + if err := c.updateCrdNatGwLabels(gw.Name, ""); err != nil { klog.Errorf("failed to update nat gw: %v", gw.Name, err) return err } diff --git a/pkg/controller/qos_policy.go b/pkg/controller/qos_policy.go index 7f97d0dbb1f..3e7fde2d99e 100644 --- a/pkg/controller/qos_policy.go +++ b/pkg/controller/qos_policy.go @@ -3,8 +3,10 @@ package controller import ( "context" "fmt" + "net" "reflect" "sort" + "strings" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" "github.com/kubeovn/kube-ovn/pkg/util" @@ -337,7 +339,46 @@ func (c *Controller) reconcileEIPBandtithLimitRules( return nil } +func validateIPMatchValue(matachValue string) bool { + parts := strings.Split(matachValue, " ") + if len(parts) != 2 { + klog.Errorf("invalid ip MatchValue %s", matachValue) + return false + } + + direction := parts[0] + if direction != "src" && direction != "dst" { + klog.Errorf("invalid direction %s, must be src or dst", direction) + return false + } + + cidr := parts[1] + if _, _, err := net.ParseCIDR(cidr); err != nil { + klog.Errorf("invalid cidr %s", cidr) + return false + } + // invalid cidr + return true +} + func (c *Controller) validateQosPolicy(qosPolicy *kubeovnv1.QoSPolicy) error { + var err error + if qosPolicy.Spec.BandwidthLimitRules != nil { + for _, rule := range qosPolicy.Spec.BandwidthLimitRules { + if rule.MatchType == "ip" { + if !validateIPMatchValue(rule.MatchValue) { + err = fmt.Errorf("invalid ip MatchValue %s", rule.MatchValue) + klog.Error(err) + return err + } + } + } + } + if !qosPolicy.Spec.Shared && qosPolicy.Spec.BindingType == kubeovnv1.QoSBindingTypeNatGw { + err = fmt.Errorf("qos policy %s is not shared, but binding to nat gateway", qosPolicy.Name) + klog.Error(err) + return err + } return nil } @@ -354,17 +395,34 @@ func (c *Controller) handleUpdateQoSPolicy(key string) error { } // should delete if !cachedQos.DeletionTimestamp.IsZero() { - eips, err := c.iptablesEipsLister.List( - labels.SelectorFromSet(labels.Set{util.QoSLabel: key})) - if err != nil { - klog.Errorf("failed to get eip list, %v", err) - return err + if cachedQos.Spec.BindingType == kubeovnv1.QoSBindingTypeEIP { + eips, err := c.iptablesEipsLister.List( + labels.SelectorFromSet(labels.Set{util.QoSLabel: key})) + if err != nil { + klog.Errorf("failed to get eip list, %v", err) + return err + } + if len(eips) != 0 { + err = fmt.Errorf("qos policy %s is being used", key) + klog.Error(err) + return err + } } - if len(eips) != 0 { - err = fmt.Errorf("qos policy %s is being used", key) - klog.Error(err) - return err + + if cachedQos.Spec.BindingType == kubeovnv1.QoSBindingTypeNatGw { + gws, err := c.vpcNatGatewayLister.List( + labels.SelectorFromSet(labels.Set{util.QoSLabel: key})) + if err != nil { + klog.Errorf("failed to get gw list, %v", err) + return err + } + if len(gws) != 0 { + err = fmt.Errorf("qos policy %s is being used", key) + klog.Error(err) + return err + } } + if err = c.handleDelQoSPoliciesFinalizer(key); err != nil { klog.Errorf("failed to handle del finalizer for qos %s, %v", key, err) return err diff --git a/pkg/controller/vpc_nat_gateway.go b/pkg/controller/vpc_nat_gateway.go index 9459eb7a0ff..c3a29733b6b 100644 --- a/pkg/controller/vpc_nat_gateway.go +++ b/pkg/controller/vpc_nat_gateway.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "regexp" "strings" "time" @@ -41,6 +42,8 @@ const ( natGwSnatDel = "snat-del" natGwEipIngressQoSAdd = "eip-ingress-qos-add" natGwEipIngressQoSDel = "eip-ingress-qos-del" + QoSAdd = "qos-add" + QoSDel = "qos-del" natGwEipEgressQoSAdd = "eip-egress-qos-add" natGwEipEgressQoSDel = "eip-egress-qos-del" natGwSubnetFipAdd = "floating-ip-add" @@ -118,6 +121,7 @@ func (c *Controller) enqueueUpdateVpcNatGw(old, new interface{}) { utilruntime.HandleError(err) return } + klog.V(3).Infof("enqueue update vpc-nat-gw %s", key) c.addOrUpdateVpcNatGatewayQueue.Add(key) } @@ -128,6 +132,7 @@ func (c *Controller) enqueueDeleteVpcNatGw(obj interface{}) { utilruntime.HandleError(err) return } + klog.V(3).Infof("enqueue del vpc-nat-gw %s", key) c.delVpcNatGatewayQueue.Add(key) } @@ -216,6 +221,27 @@ func (c *Controller) handleDelVpcNatGw(key string) error { return nil } +func isVpcNatGwChanged(gw *kubeovnv1.VpcNatGateway) bool { + + if !reflect.DeepEqual(gw.Spec.ExternalSubnets, gw.Status.ExternalSubnets) { + gw.Status.ExternalSubnets = gw.Spec.ExternalSubnets + return true + } + if !reflect.DeepEqual(gw.Spec.Selector, gw.Status.Selector) { + gw.Status.Selector = gw.Spec.Selector + return true + } + if !reflect.DeepEqual(gw.Spec.Tolerations, gw.Status.Tolerations) { + gw.Status.Tolerations = gw.Spec.Tolerations + return true + } + if !reflect.DeepEqual(gw.Spec.Affinity, gw.Status.Affinity) { + gw.Status.Affinity = gw.Spec.Affinity + return true + } + return false +} + func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { // create nat gw statefulset c.vpcNatGwKeyMutex.Lock(key) @@ -243,6 +269,7 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { // check or create statefulset needToCreate := false + needToUpdate := false oldSts, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace). Get(context.Background(), genNatGwStsName(gw.Name), metav1.GetOptions{}) @@ -253,8 +280,10 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { return err } } - newSts := c.genNatGwStatefulSet(gw, oldSts.DeepCopy()) + if !needToCreate && isVpcNatGwChanged(gw) { + needToUpdate = true + } if needToCreate { // if pod create successfully, will add initVpcNatGatewayQueue @@ -264,15 +293,50 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { klog.Error(err) return err } + if err = c.patchNatGwStatus(key); err != nil { + klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err) + return err + } return nil - } else { + } else if needToUpdate { if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace). Update(context.Background(), newSts, metav1.UpdateOptions{}); err != nil { err := fmt.Errorf("failed to update statefulset '%s', err: %v", newSts.Name, err) klog.Error(err) return err } + if err = c.patchNatGwStatus(key); err != nil { + klog.Errorf("failed to patch nat gw sts status for nat gw %s, %v", key, err) + return err + } + } else { + // check if need to change qos + if gw.Spec.QoSPolicy != gw.Status.QoSPolicy { + if gw.Status.QoSPolicy != "" { + if err = c.execNatGwQoS(gw, gw.Status.QoSPolicy, QoSDel); err != nil { + klog.Errorf("failed to add qos for nat gw %s, %v", key, err) + return err + } + } + if gw.Spec.QoSPolicy != "" { + if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil { + klog.Errorf("failed to del qos for nat gw %s, %v", key, err) + return err + } + } + if err := c.updateCrdNatGwLabels(key, gw.Spec.QoSPolicy); err != nil { + err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err) + klog.Error(err) + return err + } + // if update qos success, will update nat gw status + if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil { + klog.Errorf("failed to patch nat gw qos status for nat gw %s, %v", key, err) + return err + } + } } + return nil } @@ -296,12 +360,6 @@ func (c *Controller) handleInitVpcNatGw(key string) error { return fmt.Errorf("failed to get subnet %s", gw.Spec.Subnet) } - if err := c.updateCrdNatGw(gw.Name); err != nil { - err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err) - klog.Error(err) - return err - } - oriPod, err := c.getNatGwPod(key) if err != nil { err := fmt.Errorf("failed to get nat gw %s pod: %v", gw.Name, err) @@ -325,6 +383,27 @@ func (c *Controller) handleInitVpcNatGw(key string) error { klog.Error(err) return err } + + if gw.Spec.QoSPolicy != "" { + if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil { + klog.Errorf("failed to add qos for nat gw %s, %v", key, err) + return err + } + } + // if update qos success, will update nat gw status + if gw.Spec.QoSPolicy != gw.Status.QoSPolicy { + if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil { + klog.Errorf("failed to patch status for nat gw %s, %v", key, err) + return err + } + } + + if err := c.updateCrdNatGwLabels(gw.Name, gw.Spec.QoSPolicy); err != nil { + err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err) + klog.Error(err) + return err + } + c.updateVpcFloatingIpQueue.Add(key) c.updateVpcDnatQueue.Add(key) c.updateVpcSnatQueue.Add(key) @@ -772,7 +851,7 @@ func (c *Controller) initCreateAt(key string) (err error) { return nil } -func (c *Controller) updateCrdNatGw(key string) error { +func (c *Controller) updateCrdNatGwLabels(key string, qos string) error { gw, err := c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Get(context.Background(), key, metav1.GetOptions{}) if err != nil { errMsg := fmt.Errorf("failed to get vpc nat gw '%s', %v", key, err) @@ -787,6 +866,7 @@ func (c *Controller) updateCrdNatGw(key string) error { gw.Labels = map[string]string{ util.SubnetNameLabel: gw.Spec.Subnet, util.VpcNameLabel: gw.Spec.Vpc, + util.QoSLabel: qos, } needUpdateLabel = true } else { @@ -800,6 +880,11 @@ func (c *Controller) updateCrdNatGw(key string) error { gw.Labels[util.VpcNameLabel] = gw.Spec.Vpc needUpdateLabel = true } + if gw.Labels[util.QoSLabel] != qos { + op = "replace" + gw.Labels[util.QoSLabel] = qos + needUpdateLabel = true + } } if needUpdateLabel { patchPayloadTemplate := `[{ "op": "%s", "path": "/metadata/labels", "value": %s }]` @@ -828,3 +913,158 @@ func (c *Controller) getNatGw(router, subnet string) (string, error) { } return "", fmt.Errorf("too many nat gw") } + +func (c *Controller) patchNatGwQoSStatus(key, qos string) error { + // add qos label to vpc nat gw + var changed bool + oriGw, err := c.vpcNatGatewayLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to get vpc nat gw %s, %v", key, err) + return err + } + gw := oriGw.DeepCopy() + + // update status.qosPolicy + if gw.Status.QoSPolicy != qos { + gw.Status.QoSPolicy = qos + changed = true + } + + if changed { + bytes, err := gw.Status.Bytes() + if err != nil { + klog.Errorf("failed to marshal vpc nat gw %s status, %v", gw.Name, err) + return err + } + if _, err = c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Patch(context.Background(), gw.Name, types.MergePatchType, + bytes, metav1.PatchOptions{}, "status"); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to patch gw %s, %v", gw.Name, err) + return err + } + } + return nil +} + +func (c *Controller) patchNatGwStatus(key string) error { + var changed bool + oriGw, err := c.vpcNatGatewayLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to get vpc nat gw %s, %v", key, err) + return err + } + gw := oriGw.DeepCopy() + + if !reflect.DeepEqual(gw.Spec.ExternalSubnets, gw.Status.ExternalSubnets) { + gw.Status.ExternalSubnets = gw.Spec.ExternalSubnets + changed = true + } + if !reflect.DeepEqual(gw.Spec.Selector, gw.Status.Selector) { + gw.Status.Selector = gw.Spec.Selector + changed = true + } + if !reflect.DeepEqual(gw.Spec.Tolerations, gw.Status.Tolerations) { + gw.Status.Tolerations = gw.Spec.Tolerations + changed = true + } + if !reflect.DeepEqual(gw.Spec.Affinity, gw.Status.Affinity) { + gw.Status.Affinity = gw.Spec.Affinity + changed = true + } + + if changed { + bytes, err := gw.Status.Bytes() + if err != nil { + return err + } + if _, err = c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Patch(context.Background(), gw.Name, types.MergePatchType, + bytes, metav1.PatchOptions{}, "status"); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to patch gw %s, %v", gw.Name, err) + return err + } + } + return nil +} + +func (c *Controller) execNatGwQoS(gw *kubeovnv1.VpcNatGateway, qos string, operation string) error { + var err error + qosPolicy, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Get(context.Background(), qos, metav1.GetOptions{}) + if err != nil { + klog.Errorf("get qos policy %s failed: %v", qos, err) + return err + } + if !qosPolicy.Status.Shared { + err := fmt.Errorf("not support unshared qos policy %s to related to gw", qos) + klog.Error(err) + return err + } + if qosPolicy.Status.BindingType != kubeovnv1.QoSBindingTypeNatGw { + err := fmt.Errorf("not support qos policy %s binding type %s to related to gw", qos, qosPolicy.Status.BindingType) + klog.Error(err) + return err + } + return c.execNatGwBandtithLimitRules(gw, qosPolicy.Status.BandwidthLimitRules, operation) +} + +func (c *Controller) execNatGwBandtithLimitRules(gw *kubeovnv1.VpcNatGateway, rules kubeovnv1.QoSPolicyBandwidthLimitRules, operation string) error { + var err error + for _, rule := range rules { + if err = c.execNatGwQoSInPod(gw.Name, rule, operation); err != nil { + klog.Errorf("failed to %s ingress gw '%s' qos in pod, %v", operation, gw.Name, err) + return err + } + } + return nil +} + +func (c *Controller) execNatGwQoSInPod( + dp string, r *kubeovnv1.QoSPolicyBandwidthLimitRule, operation string) error { + gwPod, err := c.getNatGwPod(dp) + if err != nil { + klog.Errorf("failed to get nat gw pod, %v", err) + return err + } + var addRules []string + var classifierType, matchDirection, cidr string + if r.MatchType == "ip" { + classifierType = "u32" + // matchValue: dst xxx.xxx.xxx.xxx/32 + splitStr := strings.Split(r.MatchValue, " ") + if len(splitStr) != 2 { + err := fmt.Errorf("matchValue %s format error", r.MatchValue) + klog.Error(err) + return err + } + matchDirection = splitStr[0] + cidr = splitStr[1] + } else if r.MatchType == "" { + classifierType = "matchall" + } else { + err := fmt.Errorf("MatchType %s format error", r.MatchType) + klog.Error(err) + return err + } + rule := fmt.Sprintf("%s,%s,%d,%s,%s,%s,%s,%s,%s", + r.Direction, r.Interface, r.Priority, + classifierType, r.MatchType, matchDirection, + cidr, r.RateMax, r.BurstMax) + addRules = append(addRules, rule) + + if err = c.execNatGwRules(gwPod, operation, addRules); err != nil { + err = fmt.Errorf("failed to exec nat gateway rule, err: %v", err) + klog.Error(err) + return err + } + return nil +} diff --git a/yamls/crd.yaml b/yamls/crd.yaml index 80d2fe29a59..32a1e901345 100644 --- a/yamls/crd.yaml +++ b/yamls/crd.yaml @@ -165,10 +165,317 @@ spec: name: v1 served: true storage: true + subresources: + status: {} schema: openAPIV3Schema: type: object properties: + status: + type: object + properties: + externalSubnets: + items: + type: string + type: array + selector: + type: array + items: + type: string + qosPolicy: + type: string + tolerations: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + enum: + - Equal + - Exists + value: + type: string + effect: + type: string + enum: + - NoExecute + - NoSchedule + - PreferNoSchedule + tolerationSeconds: + type: integer + affinity: + properties: + nodeAffinity: + properties: + preferredDuringSchedulingIgnoredDuringExecution: + items: + properties: + preference: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + weight: + format: int32 + type: integer + required: + - preference + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + properties: + nodeSelectorTerms: + items: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchFields: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + type: object + type: array + required: + - nodeSelectorTerms + type: object + type: object + podAffinity: + properties: + preferredDuringSchedulingIgnoredDuringExecution: + items: + properties: + podAffinityTerm: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + weight: + format: int32 + type: integer + required: + - podAffinityTerm + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + items: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + type: array + type: object + podAntiAffinity: + properties: + preferredDuringSchedulingIgnoredDuringExecution: + items: + properties: + podAffinityTerm: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + weight: + format: int32 + type: integer + required: + - podAffinityTerm + - weight + type: object + type: array + requiredDuringSchedulingIgnoredDuringExecution: + items: + properties: + labelSelector: + properties: + matchExpressions: + items: + properties: + key: + type: string + x-kubernetes-patch-strategy: merge + x-kubernetes-patch-merge-key: key + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + namespaces: + items: + type: string + type: array + topologyKey: + type: string + required: + - topologyKey + type: object + type: array + type: object + type: object spec: type: object properties: @@ -186,6 +493,8 @@ spec: type: array items: type: string + qosPolicy: + type: string tolerations: type: array items: diff --git a/yamls/ovn-dpdk.yaml b/yamls/ovn-dpdk.yaml index 2622c57d076..387ffbf133d 100644 --- a/yamls/ovn-dpdk.yaml +++ b/yamls/ovn-dpdk.yaml @@ -18,6 +18,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips diff --git a/yamls/ovn-ha.yaml b/yamls/ovn-ha.yaml index df33d67877a..1a7cbcf13cb 100644 --- a/yamls/ovn-ha.yaml +++ b/yamls/ovn-ha.yaml @@ -17,6 +17,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips