diff --git a/charts/templates/ovn-CR.yaml b/charts/templates/ovn-CR.yaml index 5c7730307079..1fd647b584b5 100644 --- a/charts/templates/ovn-CR.yaml +++ b/charts/templates/ovn-CR.yaml @@ -11,6 +11,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips diff --git a/dist/images/install.sh b/dist/images/install.sh index b0990b833416..688b37cca88f 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -385,10 +385,17 @@ spec: name: v1 served: true storage: true + subresources: + status: {} schema: openAPIV3Schema: type: object properties: + status: + type: object + properties: + qosPolicy: + type: string spec: type: object properties: @@ -406,6 +413,8 @@ spec: type: array items: type: string + qosPolicy: + type: string tolerations: type: array items: @@ -2322,6 +2331,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips @@ -2835,6 +2845,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips diff --git a/dist/images/vpcnatgateway/nat-gateway.sh b/dist/images/vpcnatgateway/nat-gateway.sh index abdcb7aa7bdf..9e252281cae3 100644 --- a/dist/images/vpcnatgateway/nat-gateway.sh +++ b/dist/images/vpcnatgateway/nat-gateway.sh @@ -242,11 +242,13 @@ function del_dnat() { # example usage: -# delete_tc_filter "1:0" "192.168.1.1" "src" -function delete_tc_filter() { - qdisc_id=$1 - v4ip=$2 - direction=$3 +# delete_tc_u32_filter "net1" "1:0" "192.168.1.1" "src" +function delete_tc_u32_filter() { + dev=$1 + qdisc_id=$2 + cidr=$3 + matchDirection=$4 + # tc -p -s -d filter show dev net1 parent $qdisc_id # filter protocol ip pref 10 u32 chain 0 @@ -259,16 +261,16 @@ function delete_tc_filter() { # Sent 0 bytes 0 pkts (dropped 0, overlimits 0) # get the corresponding filterID by the EIP, and use the filterID to delete the corresponding filtering rule. - ipList=$(tc -p -s -d filter show dev net1 parent $qdisc_id | grep "match IP " | awk '{print $4}') + ipList=$(tc -p -s -d filter show dev $dev parent $qdisc_id | grep -E "match IP src|dst ([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}") i=0 - for line in $ipList; do + echo "$ipList" | while read line; do i=$((i+1)) - if echo "$line" | grep $v4ip; then - result=$(tc -p -s -d filter show dev net1 parent $qdisc_id | grep "filter protocol ip pref [0-9]\+ u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $5,$10}' | sed -n $i"p") + if echo "$line" | grep "$matchDirection $cidr"; then + result=$(tc -p -s -d filter show dev $dev parent $qdisc_id | grep "filter protocol ip pref [0-9]\+ u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $5,$10}' | sed -n $i"p") arr=($result) pref=${arr[0]} filterID=${arr[1]} - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio $pref handle $filterID u32" + exec_cmd "tc filter del dev $dev parent $qdisc_id protocol ip prio $pref handle $filterID u32" break fi done @@ -285,16 +287,17 @@ function eip_ingress_qos_add() { priority=${arr[1]} rate=${arr[2]} burst=${arr[3]} - direction="dst" - tc qdisc add dev net1 ingress 2>/dev/nul || true + dev="net1" + matchDirection="dst" + tc qdisc add dev $dev ingress 2>/dev/nul || true # get qdisc id - qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') # del old filter - tc -p -s -d filter show dev net1 parent $qdisc_id | grep -w $v4ip + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w $v4ip if [ "$?" -eq 0 ];then - delete_tc_filter $qdisc_id $v4ip $direction + delete_tc_u32_filter $dev $qdisc_id $v4ip $matchDirection fi - exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio $priority u32 match ip $direction $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority u32 match ip $matchDirection $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" done } @@ -310,14 +313,90 @@ function eip_egress_qos_add() { rate=${arr[2]} burst=${arr[3]} qdisc_id="1:0" - direction="src" - tc qdisc add dev net1 root handle $qdisc_id htb 2>/dev/nul || true + matchDirection="src" + dev="net1" + tc qdisc add dev $dev root handle $qdisc_id htb 2>/dev/nul || true # del old filter - tc -p -s -d filter show dev net1 parent $qdisc_id | grep -w $v4ip + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w $v4ip if [ "$?" -eq 0 ];then - delete_tc_filter $qdisc_id $v4ip $direction + delete_tc_u32_filter $dev $qdisc_id $v4ip $matchDirection + fi + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority u32 match ip $matchDirection $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + done +} + +function qos_add() { + for rule in $@ + do + IFS=',' read -r -a arr <<< "$rule" + local qdiscType=(${arr[0]}) + local dev=${arr[1]} + local priority=${arr[2]} + local classifierType=${arr[3]} + local matchType=${arr[4]} + local matchDirection=${arr[5]} + local cidr=${arr[6]} + local rate=${arr[7]} + local burst=${arr[8]} + + if [ "$qdiscType" == "ingress" ];then + tc qdisc add dev $dev ingress 2>/dev/null || true + # get qdisc id + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') + elif [ "$qdiscType" == "egress" ];then + qdisc_id="1:0" + tc qdisc add dev $dev root handle $qdisc_id htb 2>/dev/null || true + fi + + if [ "$classifierType" == "u32" ];then + # del old filter + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w $cidr + if [ "$?" -eq 0 ];then + delete_tc_u32_filter $dev $qdisc_id $cidr $matchDirection + fi + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority u32 match $matchType $matchDirection $cidr police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + elif [ "$classifierType" == "matchall" ];then + # del old filter + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w matchall + if [ "$?" -eq 0 ];then + exec_cmd "tc filter del dev $dev parent $qdisc_id protocol ip prio $priority matchall" + fi + exec_cmd "tc filter add dev $dev parent $qdisc_id protocol ip prio $priority matchall action police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" + fi + done +} + +function qos_del() { + for rule in $@ + do + IFS=',' read -r -a arr <<< "$rule" + local qdiscType=(${arr[0]}) + local dev=${arr[1]} + local priority=${arr[2]} + local classifierType=${arr[3]} + local matchType=${arr[4]} + local matchDirection=${arr[5]} + local cidr=${arr[6]} + local rate=${arr[7]} + local burst=${arr[8]} + + if [ "$qdiscType" == "ingress" ];then + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') + if [ -z "$qdisc_id" ]; then + exit 0 + fi + elif [ "$qdiscType" == "egress" ];then + qdisc_id="1:0" + fi + # if qdisc_id is empty, this means ingress qdisc is not added, so we don't need to delete filter. + if [ "$classifierType" == "u32" ];then + delete_tc_u32_filter $dev $qdisc_id $cidr $matchDirection + elif [ "$classifierType" == "matchall" ];then + tc -p -s -d filter show dev $dev parent $qdisc_id | grep -w matchall + if [ "$?" -eq 0 ];then + exec_cmd "tc filter del dev $dev parent $qdisc_id protocol ip prio $priority matchall" + fi fi - exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio $priority u32 match ip $direction $v4ip police rate "$rate"Mbit burst "$burst"Mb drop flowid :1" done } @@ -325,12 +404,13 @@ function eip_ingress_qos_del() { for rule in $@ do arr=(${rule//,/ }) - v4ip=(${arr[0]//\// }) - direction="dst" - qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') + cidr=(${arr[0]//\// }) + matchDirection="dst" + dev="net1" + qdisc_id=$(tc qdisc show dev $dev ingress | awk '{print $3}') # if qdisc_id is empty, this means ingress qdisc is not added, so we don't need to delete filter. if [ -n "$qdisc_id" ]; then - delete_tc_filter $qdisc_id $v4ip $direction + delete_tc_u32_filter $dev $qdisc_id $cidr $matchDirection fi done } @@ -339,10 +419,11 @@ function eip_egress_qos_del() { for rule in $@ do arr=(${rule//,/ }) - v4ip=(${arr[0]//\// }) - direction="src" + cidr=(${arr[0]//\// }) + matchDirection="src" qdisc_id="1:0" - delete_tc_filter $qdisc_id $v4ip $direction + dev="net1" + delete_tc_u32_filter $dev $qdisc_id $cidr $matchDirection done } @@ -422,6 +503,14 @@ case $opt in echo "eip-egress-qos-del $rules" eip_egress_qos_del $rules ;; + qos-add) + echo "qos-add $rules" + qos_add $rules + ;; + qos-del) + echo "qos-del $rules" + qos_del $rules + ;; *) echo "Usage: $0 [init|subnet-route-add|subnet-route-del|eip-add|eip-del|floating-ip-add|floating-ip-del|dnat-add|dnat-del|snat-add|snat-del] ..." exit 1 diff --git a/pkg/apis/kubeovn/v1/status.go b/pkg/apis/kubeovn/v1/status.go index cae4d2968ab9..28f87ff69bf3 100644 --- a/pkg/apis/kubeovn/v1/status.go +++ b/pkg/apis/kubeovn/v1/status.go @@ -137,3 +137,13 @@ func (qoss *QoSPolicyStatus) Bytes() ([]byte, error) { klog.V(5).Info("status body", newStr) return []byte(newStr), nil } + +func (vns *VpcNatStatus) Bytes() ([]byte, error) { + bytes, err := json.Marshal(vns) + if err != nil { + return nil, err + } + newStr := fmt.Sprintf(`{"status": %s}`, string(bytes)) + klog.V(5).Info("status body", newStr) + return []byte(newStr), nil +} diff --git a/pkg/apis/kubeovn/v1/types.go b/pkg/apis/kubeovn/v1/types.go index cd57b7b13184..52a8040574d4 100644 --- a/pkg/apis/kubeovn/v1/types.go +++ b/pkg/apis/kubeovn/v1/types.go @@ -44,7 +44,8 @@ var ( type QoSPolicyBindingType string const ( - QoSBindingTypeEIP QoSPolicyBindingType = "EIP" + QoSBindingTypeEIP QoSPolicyBindingType = "EIP" + QoSBindingTypeNatGw QoSPolicyBindingType = "NATGW" ) type QoSPolicyRuleDirection string @@ -479,7 +480,8 @@ type VpcNatGateway struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec VpcNatSpec `json:"spec"` + Spec VpcNatSpec `json:"spec"` + Status VpcNatStatus `json:"status,omitempty"` } type VpcNatSpec struct { @@ -490,6 +492,11 @@ type VpcNatSpec struct { Selector []string `json:"selector"` Tolerations []corev1.Toleration `json:"tolerations"` Affinity corev1.Affinity `json:"affinity"` + QoSPolicy string `json:"qosPolicy"` +} + +type VpcNatStatus struct { + QoSPolicy string `json:"qosPolicy" patchStrategy:"merge"` } // +genclient diff --git a/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go b/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go index cd2a8a1781f7..f803698dfd8c 100644 --- a/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go @@ -2329,6 +2329,7 @@ func (in *VpcNatGateway) DeepCopyInto(out *VpcNatGateway) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status return } @@ -2417,6 +2418,22 @@ func (in *VpcNatSpec) DeepCopy() *VpcNatSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VpcNatStatus) DeepCopyInto(out *VpcNatStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VpcNatStatus. +func (in *VpcNatStatus) DeepCopy() *VpcNatStatus { + if in == nil { + return nil + } + out := new(VpcNatStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VpcPeering) DeepCopyInto(out *VpcPeering) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go index 2125b85d99ee..0c379f436608 100644 --- a/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_vpcnatgateway.go @@ -96,6 +96,17 @@ func (c *FakeVpcNatGateways) Update(ctx context.Context, vpcNatGateway *kubeovnv return obj.(*kubeovnv1.VpcNatGateway), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeVpcNatGateways) UpdateStatus(ctx context.Context, vpcNatGateway *kubeovnv1.VpcNatGateway, opts v1.UpdateOptions) (*kubeovnv1.VpcNatGateway, error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(vpcnatgatewaysResource, "status", vpcNatGateway), &kubeovnv1.VpcNatGateway{}) + if obj == nil { + return nil, err + } + return obj.(*kubeovnv1.VpcNatGateway), err +} + // Delete takes name of the vpcNatGateway and deletes it. Returns an error if one occurs. func (c *FakeVpcNatGateways) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { _, err := c.Fake. diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go index 03d08658f38d..074523ed5335 100644 --- a/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/vpcnatgateway.go @@ -40,6 +40,7 @@ type VpcNatGatewaysGetter interface { type VpcNatGatewayInterface interface { Create(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.CreateOptions) (*v1.VpcNatGateway, error) Update(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.UpdateOptions) (*v1.VpcNatGateway, error) + UpdateStatus(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.UpdateOptions) (*v1.VpcNatGateway, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.VpcNatGateway, error) @@ -128,6 +129,21 @@ func (c *vpcNatGateways) Update(ctx context.Context, vpcNatGateway *v1.VpcNatGat return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *vpcNatGateways) UpdateStatus(ctx context.Context, vpcNatGateway *v1.VpcNatGateway, opts metav1.UpdateOptions) (result *v1.VpcNatGateway, err error) { + result = &v1.VpcNatGateway{} + err = c.client.Put(). + Resource("vpc-nat-gateways"). + Name(vpcNatGateway.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(vpcNatGateway). + Do(ctx). + Into(result) + return +} + // Delete takes name of the vpcNatGateway and deletes it. Returns an error if one occurs. func (c *vpcNatGateways) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { return c.client.Delete(). diff --git a/pkg/controller/init.go b/pkg/controller/init.go index 48f864470dea..45033b3f8602 100644 --- a/pkg/controller/init.go +++ b/pkg/controller/init.go @@ -707,7 +707,7 @@ func (c *Controller) initSyncCrdVpcNatGw() error { return err } for _, gw := range gws { - if err := c.updateCrdNatGw(gw.Name); err != nil { + if err := c.updateCrdNatGw(gw.Name, ""); err != nil { klog.Errorf("failed to update nat gw: %v", gw.Name, err) return err } diff --git a/pkg/controller/qos_policy.go b/pkg/controller/qos_policy.go index 7f97d0dbb1f9..8923afae6e60 100644 --- a/pkg/controller/qos_policy.go +++ b/pkg/controller/qos_policy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "regexp" "sort" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" @@ -338,6 +339,24 @@ func (c *Controller) reconcileEIPBandtithLimitRules( } func (c *Controller) validateQosPolicy(qosPolicy *kubeovnv1.QoSPolicy) error { + var err error + if qosPolicy.Spec.BandwidthLimitRules != nil { + for _, rule := range qosPolicy.Spec.BandwidthLimitRules { + if rule.MatchType == "ip" { + re := regexp.MustCompile(`^(src|dst) (\d{1,3}\.){3}\d{1,3}/\d{1,2}$`) + if !re.MatchString(rule.MatchValue) { + err = fmt.Errorf("invalid cidr %s", rule.MatchValue) + klog.Error(err) + return err + } + } + } + } + if !qosPolicy.Spec.Shared && qosPolicy.Spec.BindingType == kubeovnv1.QoSBindingTypeNatGw { + err = fmt.Errorf("qos policy %s is not shared, but binding to nat gateway", qosPolicy.Name) + klog.Error(err) + return err + } return nil } @@ -354,17 +373,34 @@ func (c *Controller) handleUpdateQoSPolicy(key string) error { } // should delete if !cachedQos.DeletionTimestamp.IsZero() { - eips, err := c.iptablesEipsLister.List( - labels.SelectorFromSet(labels.Set{util.QoSLabel: key})) - if err != nil { - klog.Errorf("failed to get eip list, %v", err) - return err + if cachedQos.Spec.BindingType == kubeovnv1.QoSBindingTypeEIP { + eips, err := c.iptablesEipsLister.List( + labels.SelectorFromSet(labels.Set{util.QoSLabel: key})) + if err != nil { + klog.Errorf("failed to get eip list, %v", err) + return err + } + if len(eips) != 0 { + err = fmt.Errorf("qos policy %s is being used", key) + klog.Error(err) + return err + } } - if len(eips) != 0 { - err = fmt.Errorf("qos policy %s is being used", key) - klog.Error(err) - return err + + if cachedQos.Spec.BindingType == kubeovnv1.QoSBindingTypeNatGw { + gws, err := c.vpcNatGatewayLister.List( + labels.SelectorFromSet(labels.Set{util.QoSLabel: key})) + if err != nil { + klog.Errorf("failed to get gw list, %v", err) + return err + } + if len(gws) != 0 { + err = fmt.Errorf("qos policy %s is being used", key) + klog.Error(err) + return err + } } + if err = c.handleDelQoSPoliciesFinalizer(key); err != nil { klog.Errorf("failed to handle del finalizer for qos %s, %v", key, err) return err diff --git a/pkg/controller/vpc_nat_gateway.go b/pkg/controller/vpc_nat_gateway.go index 9459eb7a0ffc..0e8508e97ed5 100644 --- a/pkg/controller/vpc_nat_gateway.go +++ b/pkg/controller/vpc_nat_gateway.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "regexp" "strings" "time" @@ -41,6 +42,8 @@ const ( natGwSnatDel = "snat-del" natGwEipIngressQoSAdd = "eip-ingress-qos-add" natGwEipIngressQoSDel = "eip-ingress-qos-del" + QoSAdd = "qos-add" + QoSDel = "qos-del" natGwEipEgressQoSAdd = "eip-egress-qos-add" natGwEipEgressQoSDel = "eip-egress-qos-del" natGwSubnetFipAdd = "floating-ip-add" @@ -118,6 +121,7 @@ func (c *Controller) enqueueUpdateVpcNatGw(old, new interface{}) { utilruntime.HandleError(err) return } + klog.V(3).Infof("enqueue update vpc-nat-gw %s", key) c.addOrUpdateVpcNatGatewayQueue.Add(key) } @@ -128,6 +132,7 @@ func (c *Controller) enqueueDeleteVpcNatGw(obj interface{}) { utilruntime.HandleError(err) return } + klog.V(3).Infof("enqueue del vpc-nat-gw %s", key) c.delVpcNatGatewayQueue.Add(key) } @@ -216,6 +221,32 @@ func (c *Controller) handleDelVpcNatGw(key string) error { return nil } +func isStatefulSetChanged(oldSts, newSts *v1.StatefulSet) bool { + if oldSts.ObjectMeta.Name != newSts.ObjectMeta.Name || !reflect.DeepEqual(oldSts.ObjectMeta.Labels, newSts.ObjectMeta.Labels) { + return true + } + + if *oldSts.Spec.Replicas != *newSts.Spec.Replicas || + !reflect.DeepEqual(oldSts.Spec.Selector, newSts.Spec.Selector) || + !reflect.DeepEqual(oldSts.Spec.Template.ObjectMeta.Labels, newSts.Spec.Template.ObjectMeta.Labels) || + !reflect.DeepEqual(oldSts.Spec.Template.ObjectMeta.Annotations, newSts.Spec.Template.ObjectMeta.Annotations) || + len(oldSts.Spec.Template.Spec.Containers) != len(newSts.Spec.Template.Spec.Containers) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Containers[0].Name, newSts.Spec.Template.Spec.Containers[0].Name) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Containers[0].Image, newSts.Spec.Template.Spec.Containers[0].Image) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Containers[0].Command, newSts.Spec.Template.Spec.Containers[0].Command) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Containers[0].Args, newSts.Spec.Template.Spec.Containers[0].Args) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Containers[0].ImagePullPolicy, newSts.Spec.Template.Spec.Containers[0].ImagePullPolicy) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Containers[0].SecurityContext, newSts.Spec.Template.Spec.Containers[0].SecurityContext) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.NodeSelector, newSts.Spec.Template.Spec.NodeSelector) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Tolerations, newSts.Spec.Template.Spec.Tolerations) || + !reflect.DeepEqual(oldSts.Spec.Template.Spec.Affinity, newSts.Spec.Template.Spec.Affinity) || + !reflect.DeepEqual(oldSts.Spec.UpdateStrategy, newSts.Spec.UpdateStrategy) { + return true + } + + return false +} + func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { // create nat gw statefulset c.vpcNatGwKeyMutex.Lock(key) @@ -243,6 +274,7 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { // check or create statefulset needToCreate := false + needToUpdate := false oldSts, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace). Get(context.Background(), genNatGwStsName(gw.Name), metav1.GetOptions{}) @@ -253,8 +285,10 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { return err } } - newSts := c.genNatGwStatefulSet(gw, oldSts.DeepCopy()) + if isStatefulSetChanged(oldSts, newSts) { + needToUpdate = true + } if needToCreate { // if pod create successfully, will add initVpcNatGatewayQueue @@ -265,14 +299,36 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error { return err } return nil - } else { + } else if needToUpdate { if _, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace). Update(context.Background(), newSts, metav1.UpdateOptions{}); err != nil { err := fmt.Errorf("failed to update statefulset '%s', err: %v", newSts.Name, err) klog.Error(err) return err } + } else { + // check if need to change qos + if gw.Spec.QoSPolicy != gw.Status.QoSPolicy { + if gw.Status.QoSPolicy != "" { + if err = c.execNatGwQoS(gw, gw.Status.QoSPolicy, QoSDel); err != nil { + klog.Errorf("failed to add qos for natgw %s, %v", key, err) + return err + } + } + if gw.Spec.QoSPolicy != "" { + if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil { + klog.Errorf("failed to del qos for natgw %s, %v", key, err) + return err + } + } + // if update qos success, will update nat gw status\ + if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil { + klog.Errorf("failed to patch status for natgw %s, %v", key, err) + return err + } + } } + return nil } @@ -296,12 +352,6 @@ func (c *Controller) handleInitVpcNatGw(key string) error { return fmt.Errorf("failed to get subnet %s", gw.Spec.Subnet) } - if err := c.updateCrdNatGw(gw.Name); err != nil { - err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err) - klog.Error(err) - return err - } - oriPod, err := c.getNatGwPod(key) if err != nil { err := fmt.Errorf("failed to get nat gw %s pod: %v", gw.Name, err) @@ -325,6 +375,27 @@ func (c *Controller) handleInitVpcNatGw(key string) error { klog.Error(err) return err } + + if gw.Spec.QoSPolicy != "" { + if err = c.execNatGwQoS(gw, gw.Spec.QoSPolicy, QoSAdd); err != nil { + klog.Errorf("failed to add qos for natgw %s, %v", key, err) + return err + } + } + // if update qos success, will update nat gw status + if gw.Spec.QoSPolicy != gw.Status.QoSPolicy { + if err = c.patchNatGwQoSStatus(key, gw.Spec.QoSPolicy); err != nil { + klog.Errorf("failed to patch status for natgw %s, %v", key, err) + return err + } + } + + if err := c.updateCrdNatGw(gw.Name, gw.Spec.QoSPolicy); err != nil { + err := fmt.Errorf("failed to update nat gw %s: %v", gw.Name, err) + klog.Error(err) + return err + } + c.updateVpcFloatingIpQueue.Add(key) c.updateVpcDnatQueue.Add(key) c.updateVpcSnatQueue.Add(key) @@ -772,7 +843,7 @@ func (c *Controller) initCreateAt(key string) (err error) { return nil } -func (c *Controller) updateCrdNatGw(key string) error { +func (c *Controller) updateCrdNatGw(key string, qos string) error { gw, err := c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Get(context.Background(), key, metav1.GetOptions{}) if err != nil { errMsg := fmt.Errorf("failed to get vpc nat gw '%s', %v", key, err) @@ -787,6 +858,7 @@ func (c *Controller) updateCrdNatGw(key string) error { gw.Labels = map[string]string{ util.SubnetNameLabel: gw.Spec.Subnet, util.VpcNameLabel: gw.Spec.Vpc, + util.QoSLabel: qos, } needUpdateLabel = true } else { @@ -800,6 +872,11 @@ func (c *Controller) updateCrdNatGw(key string) error { gw.Labels[util.VpcNameLabel] = gw.Spec.Vpc needUpdateLabel = true } + if gw.Labels[util.QoSLabel] != qos { + op = "replace" + gw.Labels[util.QoSLabel] = qos + needUpdateLabel = true + } } if needUpdateLabel { patchPayloadTemplate := `[{ "op": "%s", "path": "/metadata/labels", "value": %s }]` @@ -828,3 +905,107 @@ func (c *Controller) getNatGw(router, subnet string) (string, error) { } return "", fmt.Errorf("too many nat gw") } + +func (c *Controller) patchNatGwQoSStatus(key, qos string) error { + // add qos label to vpc nat gw + var changed bool + oriGw, err := c.vpcNatGatewayLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + gw := oriGw.DeepCopy() + + // update status.qosPolicy + if gw.Status.QoSPolicy != qos { + gw.Status.QoSPolicy = qos + changed = true + } + + if changed { + bytes, err := gw.Status.Bytes() + if err != nil { + return err + } + if _, err = c.config.KubeOvnClient.KubeovnV1().VpcNatGateways().Patch(context.Background(), gw.Name, types.MergePatchType, + bytes, metav1.PatchOptions{}, "status"); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to patch gw %s, %v", gw.Name, err) + return err + } + } + return nil +} + +func (c *Controller) execNatGwQoS(gw *kubeovnv1.VpcNatGateway, qos string, operation string) error { + var err error + qosPolicy, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Get(context.Background(), qos, metav1.GetOptions{}) + if err != nil { + klog.Errorf("get qos policy %s failed: %v", qos, err) + return err + } + if !qosPolicy.Status.Shared { + err := fmt.Errorf("not support unshared qos policy %s to related to gw", qos) + klog.Error(err) + return err + } + if qosPolicy.Status.BindingType != kubeovnv1.QoSBindingTypeNatGw { + err := fmt.Errorf("not support qos policy %s binding type %s to related to gw", qos, qosPolicy.Status.BindingType) + klog.Error(err) + return err + } + return c.execNatGwBandtithLimitRules(gw, qosPolicy.Status.BandwidthLimitRules, operation) +} + +func (c *Controller) execNatGwBandtithLimitRules(gw *kubeovnv1.VpcNatGateway, rules kubeovnv1.QoSPolicyBandwidthLimitRules, operation string) error { + var err error + for _, rule := range rules { + if err = c.execNatGwQoSInPod(gw.Name, rule, operation); err != nil { + klog.Errorf("failed to %s ingress gw '%s' qos in pod, %v", operation, gw.Name, err) + return err + } + } + return nil +} + +func (c *Controller) execNatGwQoSInPod( + dp string, r *kubeovnv1.QoSPolicyBandwidthLimitRule, operation string) error { + gwPod, err := c.getNatGwPod(dp) + if err != nil { + return err + } + var addRules []string + var classifierType, matchDirection, cidr string + if r.MatchType == "ip" { + classifierType = "u32" + // matchValue: dst xxx.xxx.xxx.xxx/32 + splitStr := strings.Split(r.MatchValue, " ") + if len(splitStr) != 2 { + err := fmt.Errorf("matchValue %s format error", r.MatchValue) + klog.Error(err) + return err + } + matchDirection = splitStr[0] + cidr = splitStr[1] + } else if r.MatchType == "" { + classifierType = "matchall" + } else { + err := fmt.Errorf("MatchType %s format error", r.MatchType) + klog.Error(err) + return err + } + rule := fmt.Sprintf("%s,%s,%d,%s,%s,%s,%s,%s,%s", + r.Direction, r.Interface, r.Priority, + classifierType, r.MatchType, matchDirection, + cidr, r.RateMax, r.BurstMax) + addRules = append(addRules, rule) + + if err = c.execNatGwRules(gwPod, operation, addRules); err != nil { + return err + } + return nil +} diff --git a/yamls/crd.yaml b/yamls/crd.yaml index 80d2fe29a59d..7597e776e0a0 100644 --- a/yamls/crd.yaml +++ b/yamls/crd.yaml @@ -165,10 +165,17 @@ spec: name: v1 served: true storage: true + subresources: + status: {} schema: openAPIV3Schema: type: object properties: + status: + type: object + properties: + qosPolicy: + type: string spec: type: object properties: @@ -186,6 +193,8 @@ spec: type: array items: type: string + qosPolicy: + type: string tolerations: type: array items: diff --git a/yamls/ovn-dpdk.yaml b/yamls/ovn-dpdk.yaml index 2622c57d0764..387ffbf133d9 100644 --- a/yamls/ovn-dpdk.yaml +++ b/yamls/ovn-dpdk.yaml @@ -18,6 +18,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips diff --git a/yamls/ovn-ha.yaml b/yamls/ovn-ha.yaml index df33d67877a5..1a7cbcf13cb7 100644 --- a/yamls/ovn-ha.yaml +++ b/yamls/ovn-ha.yaml @@ -17,6 +17,7 @@ rules: - vpcs - vpcs/status - vpc-nat-gateways + - vpc-nat-gateways/status - subnets - subnets/status - ips