From c6be3079f28e88b0cf52405724158a2460c40274 Mon Sep 17 00:00:00 2001 From: shane Date: Fri, 24 Mar 2023 14:38:18 +0000 Subject: [PATCH 1/5] feat: add support for EIP QoS 1. Create a QoSPolicy CRD and manage its lifecycle. 2. Add a new "spec.qosPolicy" field to iptablesEIP to relate to the QoSPolicy. 3. Configure NATGW EIP QoS when "spec.qosPolicy" changes. 4. Implement NATGW EIP QoS functionality using Linux TC. --- dist/images/cleanup.sh | 2 +- dist/images/install.sh | 77 +++++ dist/images/vpcnatgateway/nat-gateway.sh | 81 +++++ kubeovn-helm/templates/ovn-CR.yaml | 2 + pkg/apis/kubeovn/v1/register.go | 2 + pkg/apis/kubeovn/v1/status.go | 10 + pkg/apis/kubeovn/v1/types.go | 75 +++- pkg/apis/kubeovn/v1/zz_generated.deepcopy.go | 136 ++++++++ pkg/client/clientset/versioned/clientset.go | 3 +- .../kubeovn/v1/fake/fake_kubeovn_client.go | 4 + .../typed/kubeovn/v1/fake/fake_qospolicy.go | 133 +++++++ .../typed/kubeovn/v1/generated_expansion.go | 2 + .../typed/kubeovn/v1/kubeovn_client.go | 5 + .../versioned/typed/kubeovn/v1/qospolicy.go | 184 ++++++++++ .../informers/externalversions/factory.go | 79 ++++- .../informers/externalversions/generic.go | 2 + .../externalversions/kubeovn/v1/interface.go | 7 + .../externalversions/kubeovn/v1/qospolicy.go | 89 +++++ .../listers/kubeovn/v1/expansion_generated.go | 4 + pkg/client/listers/kubeovn/v1/qospolicy.go | 68 ++++ pkg/controller/controller.go | 30 ++ pkg/controller/pod_iptables_eip.go | 2 +- pkg/controller/qos_policy.go | 325 ++++++++++++++++++ pkg/controller/vpc_nat_gateway.go | 6 +- pkg/controller/vpc_nat_gw_eip.go | 232 ++++++++++++- pkg/controller/vpc_nat_gw_nat.go | 6 +- pkg/util/const.go | 4 + yamls/crd.yaml | 73 ++++ yamls/ovn-dpdk.yaml | 2 + yamls/ovn-ha.yaml | 2 + 30 files changed, 1623 insertions(+), 24 deletions(-) create mode 100644 pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_qospolicy.go create mode 100644 pkg/client/clientset/versioned/typed/kubeovn/v1/qospolicy.go create mode 100644 pkg/client/informers/externalversions/kubeovn/v1/qospolicy.go create mode 100644 pkg/client/listers/kubeovn/v1/qospolicy.go create mode 100644 pkg/controller/qos_policy.go diff --git a/dist/images/cleanup.sh b/dist/images/cleanup.sh index c029840755d..d7335c4e242 100644 --- a/dist/images/cleanup.sh +++ b/dist/images/cleanup.sh @@ -113,7 +113,7 @@ kubectl delete --ignore-not-found crd htbqoses.kubeovn.io security-groups.kubeov vpc-nat-gateways.kubeovn.io vpcs.kubeovn.io vlans.kubeovn.io provider-networks.kubeovn.io \ iptables-dnat-rules.kubeovn.io iptables-eips.kubeovn.io iptables-fip-rules.kubeovn.io \ iptables-snat-rules.kubeovn.io vips.kubeovn.io switch-lb-rules.kubeovn.io vpc-dnses.kubeovn.io \ - ovn-eips.kubeovn.io ovn-fips.kubeovn.io ovn-snat-rules.kubeovn.io + ovn-eips.kubeovn.io ovn-fips.kubeovn.io ovn-snat-rules.kubeovn.io qos-policies.kubeovn.io # Remove annotations/labels in namespaces and nodes kubectl annotate no --all ovn.kubernetes.io/cidr- diff --git a/dist/images/install.sh b/dist/images/install.sh index caf3c50b76c..7fc6cf53acb 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -467,6 +467,8 @@ spec: type: string redo: type: string + qosPolicy: + type: string conditions: type: array items: @@ -495,6 +497,8 @@ spec: type: string natGwDp: type: string + qosPolicy: + type: string --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -1790,6 +1794,75 @@ spec: status: {} conversion: strategy: None +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: qos-policies.kubeovn.io +spec: + group: kubeovn.io + names: + plural: qos-policies + singular: qos-policy + shortNames: + - qos + kind: QoSPolicy + listKind: QoSPolicyList + scope: Cluster + versions: + - name: v1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - jsonPath: .spec.bandwidthLimitRule.ingressMax + name: IngressMax + type: string + - jsonPath: .spec.bandwidthLimitRule.egressMax + name: EgressMax + type: string + schema: + openAPIV3Schema: + type: object + properties: + status: + type: object + properties: + bandwidthLimitRule: + type: object + properties: + ingressMax: + type: string + egressMax: + type: string + conditions: + type: array + items: + type: object + properties: + type: + type: string + status: + type: string + reason: + type: string + message: + type: string + lastUpdateTime: + type: string + lastTransitionTime: + type: string + spec: + type: object + properties: + bandwidthLimitRule: + type: object + properties: + ingressMax: + type: string + egressMax: + type: string EOF if $DPDK; then @@ -1844,6 +1917,8 @@ rules: - switch-lb-rules/status - vpc-dnses - vpc-dnses/status + - qos-policies + - qos-policies/status verbs: - "*" - apiGroups: @@ -2353,6 +2428,8 @@ rules: - vpc-dnses/status - switch-lb-rules - switch-lb-rules/status + - qos-policies + - qos-policies/status verbs: - "*" - apiGroups: diff --git a/dist/images/vpcnatgateway/nat-gateway.sh b/dist/images/vpcnatgateway/nat-gateway.sh index 1051528dc14..4b490cc357c 100644 --- a/dist/images/vpcnatgateway/nat-gateway.sh +++ b/dist/images/vpcnatgateway/nat-gateway.sh @@ -240,6 +240,71 @@ function del_dnat() { done } + +function eip_ingress_qos_add() { + for rule in $@ + do + arr=(${rule//,/ }) + v4ip=(${arr[0]//\// }) + rate=${arr[1]} + tc qdisc add dev net1 ingress 2>/dev/nul || true + # get qdisc id + qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') + # del old filter + tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip + if [ "$?" -eq 0 ];then + exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip dst $v4ip" + fi + exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip dst $v4ip police rate "$rate"Mbit burst "$rate"Mb drop flowid :1" + done +} + +function eip_egress_qos_add() { + for rule in $@ + do + arr=(${rule//,/ }) + v4ip=(${arr[0]//\// }) + rate=${arr[1]} + qdisc_id="1:0" + tc qdisc add dev net1 root handle $qdisc_id htb 2>/dev/nul || true + # del old filter + tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip + if [ "$?" -eq 0 ];then + exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip src $v4ip" + fi + exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip src $v4ip police rate "$rate"Mbit burst "$rate"Mb drop flowid :1" + done +} + +function eip_ingress_qos_del() { + for rule in $@ + do + arr=(${rule//,/ }) + v4ip=(${arr[0]//\// }) + + qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') + tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip + if [ "$?" -eq 0 ];then + exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip dst $v4ip" + fi + done +} + +function eip_egress_qos_del() { + for rule in $@ + do + arr=(${rule//,/ }) + v4ip=(${arr[0]//\// }) + + qdisc_id="1:0" + tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip + if [ "$?" -eq 0 ];then + exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip src $v4ip" + fi + done +} + + rules=${@:2:${#}} opt=$1 case $opt in @@ -299,6 +364,22 @@ case $opt in echo "get-iptables-version $rules" get_iptables_version $rules ;; + eip-ingress-qos-add) + echo "eip-ingress-qos-add $rules" + eip_ingress_qos_add $rules + ;; + eip-egress-qos-add) + echo "eip-egress-qos-add $rules" + eip_egress_qos_add $rules + ;; + eip-ingress-qos-del) + echo "eip-ingress-qos-del $rules" + eip_ingress_qos_del $rules + ;; + eip-egress-qos-del) + echo "eip-egress-qos-del $rules" + eip_egress_qos_del $rules + ;; *) echo "Usage: $0 [init|subnet-route-add|subnet-route-del|eip-add|eip-del|floating-ip-add|floating-ip-del|dnat-add|dnat-del|snat-add|snat-del] ..." exit 1 diff --git a/kubeovn-helm/templates/ovn-CR.yaml b/kubeovn-helm/templates/ovn-CR.yaml index 131478cc443..98810dbebdb 100644 --- a/kubeovn-helm/templates/ovn-CR.yaml +++ b/kubeovn-helm/templates/ovn-CR.yaml @@ -40,6 +40,8 @@ rules: - vpc-dnses/status - switch-lb-rules - switch-lb-rules/status + - qos-policies + - qos-policies/status verbs: - "*" - apiGroups: diff --git a/pkg/apis/kubeovn/v1/register.go b/pkg/apis/kubeovn/v1/register.go index 472156939a2..b3534242c73 100644 --- a/pkg/apis/kubeovn/v1/register.go +++ b/pkg/apis/kubeovn/v1/register.go @@ -65,6 +65,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &SwitchLBRuleList{}, &VpcDns{}, &VpcDnsList{}, + &QoSPolicy{}, + &QoSPolicyList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/kubeovn/v1/status.go b/pkg/apis/kubeovn/v1/status.go index 291b97c6589..b4a5e147ba2 100644 --- a/pkg/apis/kubeovn/v1/status.go +++ b/pkg/apis/kubeovn/v1/status.go @@ -117,3 +117,13 @@ func (osrs *OvnSnatRuleStatus) Bytes() ([]byte, error) { klog.V(5).Info("status body", newStr) return []byte(newStr), nil } + +func (qoss *QoSPolicyStatus) Bytes() ([]byte, error) { + bytes, err := json.Marshal(qoss) + if err != nil { + return nil, err + } + newStr := fmt.Sprintf(`{"status": %s}`, string(bytes)) + klog.V(5).Info("status body", newStr) + return []byte(newStr), nil +} diff --git a/pkg/apis/kubeovn/v1/types.go b/pkg/apis/kubeovn/v1/types.go index 61fb1fb3a9a..1e4ec9dcd6f 100644 --- a/pkg/apis/kubeovn/v1/types.go +++ b/pkg/apis/kubeovn/v1/types.go @@ -494,6 +494,7 @@ type IptablesEipSpec struct { V6ip string `json:"v6ip"` MacAddress string `json:"macAddress"` NatGwDp string `json:"natGwDp"` + QoSPolicy string `json:"qosPolicy"` } // IptablesEIPCondition describes the state of an object at a certain point. @@ -520,11 +521,11 @@ type IptablesEIPCondition struct { type IptablesEipStatus struct { // +optional // +patchStrategy=merge - Ready bool `json:"ready" patchStrategy:"merge"` - IP string `json:"ip" patchStrategy:"merge"` - Redo string `json:"redo" patchStrategy:"merge"` - Nat string `json:"nat" patchStrategy:"merge"` - + Ready bool `json:"ready" patchStrategy:"merge"` + IP string `json:"ip" patchStrategy:"merge"` + Redo string `json:"redo" patchStrategy:"merge"` + Nat string `json:"nat" patchStrategy:"merge"` + QoSPolicy string `json:"qosPolicy" patchStrategy:"merge"` // Conditions represents the latest state of the object // +optional // +patchMergeKey=type @@ -1183,3 +1184,67 @@ type OvnSnatRuleList struct { Items []OvnSnatRule `json:"items"` } + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +genclient:nonNamespaced +// +resourceName=qos-policies + +type QoSPolicy struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec QoSPolicySpec `json:"spec"` + Status QoSPolicyStatus `json:"status,omitempty"` +} +type QoSPolicySpec struct { + BandwidthLimitRule QoSPolicyBandwidthLimitRule `json:"bandwidthLimitRule,omitempty"` +} + +// Condition describes the state of an object at a certain point. +// +k8s:deepcopy-gen=true +type QoSPolicyCondition struct { + // Type of condition. + Type ConditionType `json:"type"` + // Status of the condition, one of True, False, Unknown. + Status corev1.ConditionStatus `json:"status"` + // The reason for the condition's last transition. + // +optional + Reason string `json:"reason,omitempty"` + // A human readable message indicating details about the transition. + // +optional + Message string `json:"message,omitempty"` + // Last time the condition was probed + // +optional + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + // Last time the condition transitioned from one status to another. + // +optional + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` +} + +// BandwidthLimitRule describes the rule of an bandwidth limit. +type QoSPolicyBandwidthLimitRule struct { + IngressMax string `json:"ingressMax"` + EgressMax string `json:"egressMax"` +} + +type QoSPolicyStatus struct { + // +optional + // +patchStrategy=merge + BandwidthLimitRule QoSPolicyBandwidthLimitRule `json:"bandwidthLimitRule" patchStrategy:"merge"` + + // Conditions represents the latest state of the object + // +optional + // +patchMergeKey=type + // +patchStrategy=merge + Conditions []QoSPolicyCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type QoSPolicyList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []QoSPolicy `json:"items"` +} diff --git a/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go b/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go index 611c8ca5e12..b019bab30e0 100644 --- a/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeovn/v1/zz_generated.deepcopy.go @@ -1140,6 +1140,142 @@ func (in *ProviderNetworkStatus) DeepCopy() *ProviderNetworkStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QoSPolicy) DeepCopyInto(out *QoSPolicy) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QoSPolicy. +func (in *QoSPolicy) DeepCopy() *QoSPolicy { + if in == nil { + return nil + } + out := new(QoSPolicy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *QoSPolicy) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QoSPolicyBandwidthLimitRule) DeepCopyInto(out *QoSPolicyBandwidthLimitRule) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QoSPolicyBandwidthLimitRule. +func (in *QoSPolicyBandwidthLimitRule) DeepCopy() *QoSPolicyBandwidthLimitRule { + if in == nil { + return nil + } + out := new(QoSPolicyBandwidthLimitRule) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QoSPolicyCondition) DeepCopyInto(out *QoSPolicyCondition) { + *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QoSPolicyCondition. +func (in *QoSPolicyCondition) DeepCopy() *QoSPolicyCondition { + if in == nil { + return nil + } + out := new(QoSPolicyCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QoSPolicyList) DeepCopyInto(out *QoSPolicyList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]QoSPolicy, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QoSPolicyList. +func (in *QoSPolicyList) DeepCopy() *QoSPolicyList { + if in == nil { + return nil + } + out := new(QoSPolicyList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *QoSPolicyList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QoSPolicySpec) DeepCopyInto(out *QoSPolicySpec) { + *out = *in + out.BandwidthLimitRule = in.BandwidthLimitRule + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QoSPolicySpec. +func (in *QoSPolicySpec) DeepCopy() *QoSPolicySpec { + if in == nil { + return nil + } + out := new(QoSPolicySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QoSPolicyStatus) DeepCopyInto(out *QoSPolicyStatus) { + *out = *in + out.BandwidthLimitRule = in.BandwidthLimitRule + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]QoSPolicyCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QoSPolicyStatus. +func (in *QoSPolicyStatus) DeepCopy() *QoSPolicyStatus { + if in == nil { + return nil + } + out := new(QoSPolicyStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SecurityGroup) DeepCopyInto(out *SecurityGroup) { *out = *in diff --git a/pkg/client/clientset/versioned/clientset.go b/pkg/client/clientset/versioned/clientset.go index 4dfab592983..6681321648f 100644 --- a/pkg/client/clientset/versioned/clientset.go +++ b/pkg/client/clientset/versioned/clientset.go @@ -33,8 +33,7 @@ type Interface interface { KubeovnV1() kubeovnv1.KubeovnV1Interface } -// Clientset contains the clients for groups. Each group has exactly one -// version included in a Clientset. +// Clientset contains the clients for groups. type Clientset struct { *discovery.DiscoveryClient kubeovnV1 *kubeovnv1.KubeovnV1Client diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_kubeovn_client.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_kubeovn_client.go index 0e614bd52a9..7c30d812e8a 100644 --- a/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_kubeovn_client.go +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_kubeovn_client.go @@ -64,6 +64,10 @@ func (c *FakeKubeovnV1) ProviderNetworks() v1.ProviderNetworkInterface { return &FakeProviderNetworks{c} } +func (c *FakeKubeovnV1) QoSPolicies() v1.QoSPolicyInterface { + return &FakeQoSPolicies{c} +} + func (c *FakeKubeovnV1) SecurityGroups() v1.SecurityGroupInterface { return &FakeSecurityGroups{c} } diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_qospolicy.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_qospolicy.go new file mode 100644 index 00000000000..ffc16abfaa5 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/fake/fake_qospolicy.go @@ -0,0 +1,133 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeQoSPolicies implements QoSPolicyInterface +type FakeQoSPolicies struct { + Fake *FakeKubeovnV1 +} + +var qospoliciesResource = schema.GroupVersionResource{Group: "kubeovn.io", Version: "v1", Resource: "qos-policies"} + +var qospoliciesKind = schema.GroupVersionKind{Group: "kubeovn.io", Version: "v1", Kind: "QoSPolicy"} + +// Get takes name of the qoSPolicy, and returns the corresponding qoSPolicy object, and an error if there is any. +func (c *FakeQoSPolicies) Get(ctx context.Context, name string, options v1.GetOptions) (result *kubeovnv1.QoSPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetAction(qospoliciesResource, name), &kubeovnv1.QoSPolicy{}) + if obj == nil { + return nil, err + } + return obj.(*kubeovnv1.QoSPolicy), err +} + +// List takes label and field selectors, and returns the list of QoSPolicies that match those selectors. +func (c *FakeQoSPolicies) List(ctx context.Context, opts v1.ListOptions) (result *kubeovnv1.QoSPolicyList, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootListAction(qospoliciesResource, qospoliciesKind, opts), &kubeovnv1.QoSPolicyList{}) + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &kubeovnv1.QoSPolicyList{ListMeta: obj.(*kubeovnv1.QoSPolicyList).ListMeta} + for _, item := range obj.(*kubeovnv1.QoSPolicyList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested qoSPolicies. +func (c *FakeQoSPolicies) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchAction(qospoliciesResource, opts)) +} + +// Create takes the representation of a qoSPolicy and creates it. Returns the server's representation of the qoSPolicy, and an error, if there is any. +func (c *FakeQoSPolicies) Create(ctx context.Context, qoSPolicy *kubeovnv1.QoSPolicy, opts v1.CreateOptions) (result *kubeovnv1.QoSPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootCreateAction(qospoliciesResource, qoSPolicy), &kubeovnv1.QoSPolicy{}) + if obj == nil { + return nil, err + } + return obj.(*kubeovnv1.QoSPolicy), err +} + +// Update takes the representation of a qoSPolicy and updates it. Returns the server's representation of the qoSPolicy, and an error, if there is any. +func (c *FakeQoSPolicies) Update(ctx context.Context, qoSPolicy *kubeovnv1.QoSPolicy, opts v1.UpdateOptions) (result *kubeovnv1.QoSPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateAction(qospoliciesResource, qoSPolicy), &kubeovnv1.QoSPolicy{}) + if obj == nil { + return nil, err + } + return obj.(*kubeovnv1.QoSPolicy), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeQoSPolicies) UpdateStatus(ctx context.Context, qoSPolicy *kubeovnv1.QoSPolicy, opts v1.UpdateOptions) (*kubeovnv1.QoSPolicy, error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(qospoliciesResource, "status", qoSPolicy), &kubeovnv1.QoSPolicy{}) + if obj == nil { + return nil, err + } + return obj.(*kubeovnv1.QoSPolicy), err +} + +// Delete takes name of the qoSPolicy and deletes it. Returns an error if one occurs. +func (c *FakeQoSPolicies) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewRootDeleteActionWithOptions(qospoliciesResource, name, opts), &kubeovnv1.QoSPolicy{}) + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeQoSPolicies) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewRootDeleteCollectionAction(qospoliciesResource, listOpts) + + _, err := c.Fake.Invokes(action, &kubeovnv1.QoSPolicyList{}) + return err +} + +// Patch applies the patch and returns the patched qoSPolicy. +func (c *FakeQoSPolicies) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *kubeovnv1.QoSPolicy, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootPatchSubresourceAction(qospoliciesResource, name, pt, data, subresources...), &kubeovnv1.QoSPolicy{}) + if obj == nil { + return nil, err + } + return obj.(*kubeovnv1.QoSPolicy), err +} diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/generated_expansion.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/generated_expansion.go index 0f47344d9b1..ff76fa746b2 100644 --- a/pkg/client/clientset/versioned/typed/kubeovn/v1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/generated_expansion.go @@ -36,6 +36,8 @@ type OvnSnatRuleExpansion interface{} type ProviderNetworkExpansion interface{} +type QoSPolicyExpansion interface{} + type SecurityGroupExpansion interface{} type SubnetExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/kubeovn_client.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/kubeovn_client.go index b9e3e1d0afa..9b5e6727fb0 100644 --- a/pkg/client/clientset/versioned/typed/kubeovn/v1/kubeovn_client.go +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/kubeovn_client.go @@ -37,6 +37,7 @@ type KubeovnV1Interface interface { OvnFipsGetter OvnSnatRulesGetter ProviderNetworksGetter + QoSPoliciesGetter SecurityGroupsGetter SubnetsGetter SwitchLBRulesGetter @@ -88,6 +89,10 @@ func (c *KubeovnV1Client) ProviderNetworks() ProviderNetworkInterface { return newProviderNetworks(c) } +func (c *KubeovnV1Client) QoSPolicies() QoSPolicyInterface { + return newQoSPolicies(c) +} + func (c *KubeovnV1Client) SecurityGroups() SecurityGroupInterface { return newSecurityGroups(c) } diff --git a/pkg/client/clientset/versioned/typed/kubeovn/v1/qospolicy.go b/pkg/client/clientset/versioned/typed/kubeovn/v1/qospolicy.go new file mode 100644 index 00000000000..1a051aad72c --- /dev/null +++ b/pkg/client/clientset/versioned/typed/kubeovn/v1/qospolicy.go @@ -0,0 +1,184 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + "time" + + v1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + scheme "github.com/kubeovn/kube-ovn/pkg/client/clientset/versioned/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// QoSPoliciesGetter has a method to return a QoSPolicyInterface. +// A group's client should implement this interface. +type QoSPoliciesGetter interface { + QoSPolicies() QoSPolicyInterface +} + +// QoSPolicyInterface has methods to work with QoSPolicy resources. +type QoSPolicyInterface interface { + Create(ctx context.Context, qoSPolicy *v1.QoSPolicy, opts metav1.CreateOptions) (*v1.QoSPolicy, error) + Update(ctx context.Context, qoSPolicy *v1.QoSPolicy, opts metav1.UpdateOptions) (*v1.QoSPolicy, error) + UpdateStatus(ctx context.Context, qoSPolicy *v1.QoSPolicy, opts metav1.UpdateOptions) (*v1.QoSPolicy, error) + Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error + Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.QoSPolicy, error) + List(ctx context.Context, opts metav1.ListOptions) (*v1.QoSPolicyList, error) + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.QoSPolicy, err error) + QoSPolicyExpansion +} + +// qoSPolicies implements QoSPolicyInterface +type qoSPolicies struct { + client rest.Interface +} + +// newQoSPolicies returns a QoSPolicies +func newQoSPolicies(c *KubeovnV1Client) *qoSPolicies { + return &qoSPolicies{ + client: c.RESTClient(), + } +} + +// Get takes name of the qoSPolicy, and returns the corresponding qoSPolicy object, and an error if there is any. +func (c *qoSPolicies) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.QoSPolicy, err error) { + result = &v1.QoSPolicy{} + err = c.client.Get(). + Resource("qos-policies"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of QoSPolicies that match those selectors. +func (c *qoSPolicies) List(ctx context.Context, opts metav1.ListOptions) (result *v1.QoSPolicyList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1.QoSPolicyList{} + err = c.client.Get(). + Resource("qos-policies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested qoSPolicies. +func (c *qoSPolicies) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Resource("qos-policies"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a qoSPolicy and creates it. Returns the server's representation of the qoSPolicy, and an error, if there is any. +func (c *qoSPolicies) Create(ctx context.Context, qoSPolicy *v1.QoSPolicy, opts metav1.CreateOptions) (result *v1.QoSPolicy, err error) { + result = &v1.QoSPolicy{} + err = c.client.Post(). + Resource("qos-policies"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(qoSPolicy). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a qoSPolicy and updates it. Returns the server's representation of the qoSPolicy, and an error, if there is any. +func (c *qoSPolicies) Update(ctx context.Context, qoSPolicy *v1.QoSPolicy, opts metav1.UpdateOptions) (result *v1.QoSPolicy, err error) { + result = &v1.QoSPolicy{} + err = c.client.Put(). + Resource("qos-policies"). + Name(qoSPolicy.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(qoSPolicy). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *qoSPolicies) UpdateStatus(ctx context.Context, qoSPolicy *v1.QoSPolicy, opts metav1.UpdateOptions) (result *v1.QoSPolicy, err error) { + result = &v1.QoSPolicy{} + err = c.client.Put(). + Resource("qos-policies"). + Name(qoSPolicy.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(qoSPolicy). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the qoSPolicy and deletes it. Returns an error if one occurs. +func (c *qoSPolicies) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { + return c.client.Delete(). + Resource("qos-policies"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *qoSPolicies) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Resource("qos-policies"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched qoSPolicy. +func (c *qoSPolicies) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.QoSPolicy, err error) { + result = &v1.QoSPolicy{} + err = c.client.Patch(pt). + Resource("qos-policies"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/factory.go b/pkg/client/informers/externalversions/factory.go index baba604c99f..83e87c3c836 100644 --- a/pkg/client/informers/externalversions/factory.go +++ b/pkg/client/informers/externalversions/factory.go @@ -47,6 +47,11 @@ type sharedInformerFactory struct { // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool + // wg tracks how many goroutines were started. + wg sync.WaitGroup + // shuttingDown is true when Shutdown has been called. It may still be running + // because it needs to wait for goroutines. + shuttingDown bool } // WithCustomResyncConfig sets a custom resync period for the specified informer types. @@ -107,20 +112,39 @@ func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResy return factory } -// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() + if f.shuttingDown { + return + } + for informerType, informer := range f.informers { if !f.startedInformers[informerType] { - go informer.Run(stopCh) + f.wg.Add(1) + // We need a new variable in each loop iteration, + // otherwise the goroutine would use the loop variable + // and that keeps changing. + informer := informer + go func() { + defer f.wg.Done() + informer.Run(stopCh) + }() f.startedInformers[informerType] = true } } } -// WaitForCacheSync waits for all started informers' cache were synced. +func (f *sharedInformerFactory) Shutdown() { + f.lock.Lock() + f.shuttingDown = true + f.lock.Unlock() + + // Will return immediately if there is nothing to wait for. + f.wg.Wait() +} + func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() @@ -167,11 +191,58 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal // SharedInformerFactory provides shared informers for resources in all known // API group versions. +// +// It is typically used like this: +// +// ctx, cancel := context.Background() +// defer cancel() +// factory := NewSharedInformerFactory(client, resyncPeriod) +// defer factory.WaitForStop() // Returns immediately if nothing was started. +// genericInformer := factory.ForResource(resource) +// typedInformer := factory.SomeAPIGroup().V1().SomeType() +// factory.Start(ctx.Done()) // Start processing these informers. +// synced := factory.WaitForCacheSync(ctx.Done()) +// for v, ok := range synced { +// if !ok { +// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v) +// return +// } +// } +// +// // Creating informers can also be created after Start, but then +// // Start must be called again: +// anotherGenericInformer := factory.ForResource(resource) +// factory.Start(ctx.Done()) type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory - ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + + // Start initializes all requested informers. They are handled in goroutines + // which run until the stop channel gets closed. + Start(stopCh <-chan struct{}) + + // Shutdown marks a factory as shutting down. At that point no new + // informers can be started anymore and Start will return without + // doing anything. + // + // In addition, Shutdown blocks until all goroutines have terminated. For that + // to happen, the close channel(s) that they were started with must be closed, + // either before Shutdown gets called or while it is waiting. + // + // Shutdown may be called multiple times, even concurrently. All such calls will + // block until all goroutines have terminated. + Shutdown() + + // WaitForCacheSync blocks until all started informers' caches were synced + // or the stop channel gets closed. WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool + // ForResource gives generic access to a shared informer of the matching type. + ForResource(resource schema.GroupVersionResource) (GenericInformer, error) + + // InternalInformerFor returns the SharedIndexInformer for obj using an internal + // client. + InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer + Kubeovn() kubeovn.Interface } diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index cac2c8df264..9b2121252b5 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -71,6 +71,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Kubeovn().V1().OvnSnatRules().Informer()}, nil case v1.SchemeGroupVersion.WithResource("provider-networks"): return &genericInformer{resource: resource.GroupResource(), informer: f.Kubeovn().V1().ProviderNetworks().Informer()}, nil + case v1.SchemeGroupVersion.WithResource("qos-policies"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Kubeovn().V1().QoSPolicies().Informer()}, nil case v1.SchemeGroupVersion.WithResource("security-groups"): return &genericInformer{resource: resource.GroupResource(), informer: f.Kubeovn().V1().SecurityGroups().Informer()}, nil case v1.SchemeGroupVersion.WithResource("subnets"): diff --git a/pkg/client/informers/externalversions/kubeovn/v1/interface.go b/pkg/client/informers/externalversions/kubeovn/v1/interface.go index f4af610ced6..69a81b27c44 100644 --- a/pkg/client/informers/externalversions/kubeovn/v1/interface.go +++ b/pkg/client/informers/externalversions/kubeovn/v1/interface.go @@ -42,6 +42,8 @@ type Interface interface { OvnSnatRules() OvnSnatRuleInformer // ProviderNetworks returns a ProviderNetworkInformer. ProviderNetworks() ProviderNetworkInformer + // QoSPolicies returns a QoSPolicyInformer. + QoSPolicies() QoSPolicyInformer // SecurityGroups returns a SecurityGroupInformer. SecurityGroups() SecurityGroupInformer // Subnets returns a SubnetInformer. @@ -116,6 +118,11 @@ func (v *version) ProviderNetworks() ProviderNetworkInformer { return &providerNetworkInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} } +// QoSPolicies returns a QoSPolicyInformer. +func (v *version) QoSPolicies() QoSPolicyInformer { + return &qoSPolicyInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} +} + // SecurityGroups returns a SecurityGroupInformer. func (v *version) SecurityGroups() SecurityGroupInformer { return &securityGroupInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/informers/externalversions/kubeovn/v1/qospolicy.go b/pkg/client/informers/externalversions/kubeovn/v1/qospolicy.go new file mode 100644 index 00000000000..de1c953a45b --- /dev/null +++ b/pkg/client/informers/externalversions/kubeovn/v1/qospolicy.go @@ -0,0 +1,89 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + time "time" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + versioned "github.com/kubeovn/kube-ovn/pkg/client/clientset/versioned" + internalinterfaces "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions/internalinterfaces" + v1 "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// QoSPolicyInformer provides access to a shared informer and lister for +// QoSPolicies. +type QoSPolicyInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1.QoSPolicyLister +} + +type qoSPolicyInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// NewQoSPolicyInformer constructs a new informer for QoSPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewQoSPolicyInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredQoSPolicyInformer(client, resyncPeriod, indexers, nil) +} + +// NewFilteredQoSPolicyInformer constructs a new informer for QoSPolicy type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredQoSPolicyInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.KubeovnV1().QoSPolicies().List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.KubeovnV1().QoSPolicies().Watch(context.TODO(), options) + }, + }, + &kubeovnv1.QoSPolicy{}, + resyncPeriod, + indexers, + ) +} + +func (f *qoSPolicyInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredQoSPolicyInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *qoSPolicyInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&kubeovnv1.QoSPolicy{}, f.defaultInformer) +} + +func (f *qoSPolicyInformer) Lister() v1.QoSPolicyLister { + return v1.NewQoSPolicyLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/listers/kubeovn/v1/expansion_generated.go b/pkg/client/listers/kubeovn/v1/expansion_generated.go index f60b7615dbc..32e00107e81 100644 --- a/pkg/client/listers/kubeovn/v1/expansion_generated.go +++ b/pkg/client/listers/kubeovn/v1/expansion_generated.go @@ -54,6 +54,10 @@ type OvnSnatRuleListerExpansion interface{} // ProviderNetworkLister. type ProviderNetworkListerExpansion interface{} +// QoSPolicyListerExpansion allows custom methods to be added to +// QoSPolicyLister. +type QoSPolicyListerExpansion interface{} + // SecurityGroupListerExpansion allows custom methods to be added to // SecurityGroupLister. type SecurityGroupListerExpansion interface{} diff --git a/pkg/client/listers/kubeovn/v1/qospolicy.go b/pkg/client/listers/kubeovn/v1/qospolicy.go new file mode 100644 index 00000000000..dd9acf2250d --- /dev/null +++ b/pkg/client/listers/kubeovn/v1/qospolicy.go @@ -0,0 +1,68 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1 + +import ( + v1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// QoSPolicyLister helps list QoSPolicies. +// All objects returned here must be treated as read-only. +type QoSPolicyLister interface { + // List lists all QoSPolicies in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1.QoSPolicy, err error) + // Get retrieves the QoSPolicy from the index for a given name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1.QoSPolicy, error) + QoSPolicyListerExpansion +} + +// qoSPolicyLister implements the QoSPolicyLister interface. +type qoSPolicyLister struct { + indexer cache.Indexer +} + +// NewQoSPolicyLister returns a new QoSPolicyLister. +func NewQoSPolicyLister(indexer cache.Indexer) QoSPolicyLister { + return &qoSPolicyLister{indexer: indexer} +} + +// List lists all QoSPolicies in the indexer. +func (s *qoSPolicyLister) List(selector labels.Selector) (ret []*v1.QoSPolicy, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1.QoSPolicy)) + }) + return ret, err +} + +// Get retrieves the QoSPolicy from the index for a given name. +func (s *qoSPolicyLister) Get(name string) (*v1.QoSPolicy, error) { + obj, exists, err := s.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1.Resource("qospolicy"), name) + } + return obj.(*v1.QoSPolicy), nil +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index fa81063c34d..b9063f7a289 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -212,6 +212,12 @@ type Controller struct { syncSgPortsQueue workqueue.RateLimitingInterface sgKeyMutex *keymutex.KeyMutex + qosPoliciesLister kubeovnlister.QoSPolicyLister + qosPolicySynced cache.InformerSynced + addQoSPolicyQueue workqueue.RateLimitingInterface + updateQoSPolicyQueue workqueue.RateLimitingInterface + delQoSPolicyQueue workqueue.RateLimitingInterface + configMapsLister v1.ConfigMapLister configMapsSynced cache.InformerSynced @@ -266,6 +272,7 @@ func NewController(config *Configuration) *Controller { nodeInformer := informerFactory.Core().V1().Nodes() serviceInformer := informerFactory.Core().V1().Services() endpointInformer := informerFactory.Core().V1().Endpoints() + qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies() configMapInformer := cmInformerFactory.Core().V1().ConfigMaps() controller := &Controller{ @@ -387,6 +394,12 @@ func NewController(config *Configuration) *Controller { endpointsSynced: endpointInformer.Informer().HasSynced, updateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"), + qosPoliciesLister: qosPolicyInformer.Lister(), + qosPolicySynced: qosPolicyInformer.Informer().HasSynced, + addQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addQoSPolicy"), + updateQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateQoSPolicy"), + delQoSPolicyQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delQoSPolicy"), + configMapsLister: configMapInformer.Lister(), configMapsSynced: configMapInformer.Informer().HasSynced, @@ -637,6 +650,15 @@ func NewController(config *Configuration) *Controller { }); err != nil { util.LogFatalAndExit(err, "failed to add pod iptables fip event handler") } + + if _, err = qosPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueAddQoSPolicy, + UpdateFunc: controller.enqueueUpdateQoSPolicy, + DeleteFunc: controller.enqueueDelQoSPolicy, + }); err != nil { + util.LogFatalAndExit(err, "failed to add qos policy event handler") + } + return controller } @@ -820,6 +842,10 @@ func (c *Controller) shutdown() { c.updateIptablesSnatRuleQueue.ShutDown() c.delIptablesSnatRuleQueue.ShutDown() + c.addQoSPolicyQueue.ShutDown() + c.updateQoSPolicyQueue.ShutDown() + c.delQoSPolicyQueue.ShutDown() + c.addOvnEipQueue.ShutDown() c.updateOvnEipQueue.ShutDown() c.resetOvnEipQueue.ShutDown() @@ -1028,6 +1054,10 @@ func (c *Controller) startWorkers(ctx context.Context) { go wait.Until(c.runUpdateIptablesSnatRuleWorker, time.Second, ctx.Done()) go wait.Until(c.runDelIptablesSnatRuleWorker, time.Second, ctx.Done()) + go wait.Until(c.runAddQoSPolicyWorker, time.Second, ctx.Done()) + go wait.Until(c.runUpdateQoSPolicyWorker, time.Second, ctx.Done()) + go wait.Until(c.runDelQoSPolicyWorker, time.Second, ctx.Done()) + if c.config.PodDefaultFipType == util.IptablesFip { go wait.Until(c.runAddPodAnnotatedIptablesEipWorker, time.Second, ctx.Done()) go wait.Until(c.runDelPodAnnotatedIptablesEipWorker, time.Second, ctx.Done()) diff --git a/pkg/controller/pod_iptables_eip.go b/pkg/controller/pod_iptables_eip.go index 59143ba675d..a3dd37a2101 100644 --- a/pkg/controller/pod_iptables_eip.go +++ b/pkg/controller/pod_iptables_eip.go @@ -264,7 +264,7 @@ func (c *Controller) handleAddPodAnnotatedIptablesEip(key string) error { klog.Errorf("failed to get vpc nat gw eip: %v", eipName, err) return err } - if err := c.createOrUpdateCrdEip(eipName, "", "", "", natGw); err != nil { + if err := c.createOrUpdateCrdEip(eipName, "", "", "", "", natGw); err != nil { klog.Errorf("failed to create eip %s: %v", eipName, err) return err } diff --git a/pkg/controller/qos_policy.go b/pkg/controller/qos_policy.go new file mode 100644 index 00000000000..2c8c98fbb79 --- /dev/null +++ b/pkg/controller/qos_policy.go @@ -0,0 +1,325 @@ +package controller + +import ( + "context" + "fmt" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +func (c *Controller) enqueueAddQoSPolicy(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + klog.V(3).Infof("enqueue add qos policy %s", key) + c.addQoSPolicyQueue.Add(key) +} + +func (c *Controller) enqueueUpdateQoSPolicy(old, new interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(new); err != nil { + utilruntime.HandleError(err) + return + } + oldQos := old.(*kubeovnv1.QoSPolicy) + newQos := new.(*kubeovnv1.QoSPolicy) + if !newQos.DeletionTimestamp.IsZero() { + klog.V(3).Infof("enqueue update to clean qos %s", key) + c.updateQoSPolicyQueue.Add(key) + return + } + + if oldQos.Status.BandwidthLimitRule != newQos.Spec.BandwidthLimitRule { + klog.V(3).Infof("enqueue update qos %s", key) + c.updateQoSPolicyQueue.Add(key) + return + } +} + +func (c *Controller) enqueueDelQoSPolicy(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + c.delQoSPolicyQueue.Add(key) +} + +func (c *Controller) runAddQoSPolicyWorker() { + for c.processNextAddQoSPolicyWorkItem() { + } +} + +func (c *Controller) runUpdateQoSPolicyWorker() { + for c.processNextUpdateQoSPolicyWorkItem() { + } +} + +func (c *Controller) runDelQoSPolicyWorker() { + for c.processNextDeleteQoSPolicyWorkItem() { + } +} + +func (c *Controller) processNextAddQoSPolicyWorkItem() bool { + obj, shutdown := c.addQoSPolicyQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.addQoSPolicyQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.addQoSPolicyQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + if err := c.handleAddQoSPolicy(key); err != nil { + c.addQoSPolicyQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.addQoSPolicyQueue.Forget(obj) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (c *Controller) processNextUpdateQoSPolicyWorkItem() bool { + obj, shutdown := c.updateQoSPolicyQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.updateQoSPolicyQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.updateQoSPolicyQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + if err := c.handleUpdateQoSPolicy(key); err != nil { + c.updateQoSPolicyQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.updateQoSPolicyQueue.Forget(obj) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (c *Controller) processNextDeleteQoSPolicyWorkItem() bool { + obj, shutdown := c.delQoSPolicyQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.delQoSPolicyQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.delQoSPolicyQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected qos in workqueue but got %#v", obj)) + return nil + } + if err := c.handleDelQoSPolicy(key); err != nil { + c.delQoSPolicyQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.delQoSPolicyQueue.Forget(obj) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + +func (c *Controller) handleAddQoSPolicy(key string) error { + + c.vpcNatGwKeyMutex.Lock(key) + defer c.vpcNatGwKeyMutex.Unlock(key) + + cachedQoS, err := c.qosPoliciesLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + + if cachedQoS.Spec.BandwidthLimitRule == cachedQoS.Status.BandwidthLimitRule { + // already ok + return nil + } + klog.V(3).Infof("handle add qos %s", key) + + if err = c.patchQoSStatus(key, &cachedQoS.Spec.BandwidthLimitRule); err != nil { + klog.Errorf("failed to patch status for qos %s, %v", key, err) + return err + } + + return nil +} + +func (c *Controller) patchQoSStatus(key string, bandwithRule *kubeovnv1.QoSPolicyBandwidthLimitRule) error { + oriQoS, err := c.qosPoliciesLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + qos := oriQoS.DeepCopy() + qos.Status.BandwidthLimitRule = *bandwithRule + bytes, err := qos.Status.Bytes() + if err != nil { + return err + } + if _, err = c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Patch(context.Background(), qos.Name, + types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to patch qos %s, %v", qos.Name, err) + return err + } + return nil +} + +func (c *Controller) handleDelQoSPoliciesFinalizer(key string) error { + cachedQoSPolicies, err := c.qosPoliciesLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + if len(cachedQoSPolicies.Finalizers) == 0 { + return nil + } + newQoSPolicies := cachedQoSPolicies.DeepCopy() + controllerutil.RemoveFinalizer(newQoSPolicies, util.ControllerName) + patch, err := util.GenerateMergePatchPayload(cachedQoSPolicies, newQoSPolicies) + if err != nil { + klog.Errorf("failed to generate patch payload for qos '%s', %v", cachedQoSPolicies.Name, err) + return err + } + if _, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Patch(context.Background(), cachedQoSPolicies.Name, + types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to remove finalizer from qos '%s', %v", cachedQoSPolicies.Name, err) + return err + } + return nil +} + +func (c *Controller) handleUpdateQoSPolicy(key string) error { + c.vpcNatGwKeyMutex.Lock(key) + defer c.vpcNatGwKeyMutex.Unlock(key) + + cachedQos, err := c.qosPoliciesLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + if cachedQos.Spec.BandwidthLimitRule != cachedQos.Status.BandwidthLimitRule { + err := fmt.Errorf("not support qos %s change rule ", key) + klog.Error(err) + return err + } + // should delete + if !cachedQos.DeletionTimestamp.IsZero() { + + eips, err := c.config.KubeOvnClient.KubeovnV1().IptablesEIPs().List(context.Background(), + metav1.ListOptions{LabelSelector: fields.OneTermEqualSelector(util.QoSLabel, key).String()}) + if err != nil { + klog.Errorf("failed to get eip list, %v", err) + return err + } + if len(eips.Items) != 0 { + err = fmt.Errorf("qos policy %s is being used", key) + klog.Error(err) + return err + } + if err = c.handleDelQoSPoliciesFinalizer(key); err != nil { + klog.Errorf("failed to handle del finalizer for qos %s, %v", key, err) + return err + } + return nil + } + if err = c.handleAddQoSPolicyFinalizer(key); err != nil { + klog.Errorf("failed to handle add finalizer for qos, %v", err) + return err + } + return nil +} + +func (c *Controller) handleDelQoSPolicy(key string) error { + klog.V(3).Infof("deleted qos policy %s", key) + return nil +} + +func (c *Controller) handleAddQoSPolicyFinalizer(key string) error { + cachedQoSPolicy, err := c.qosPoliciesLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + if cachedQoSPolicy.DeletionTimestamp.IsZero() { + if util.ContainsString(cachedQoSPolicy.Finalizers, util.ControllerName) { + return nil + } + } + newQoSPolicy := cachedQoSPolicy.DeepCopy() + controllerutil.AddFinalizer(newQoSPolicy, util.ControllerName) + patch, err := util.GenerateMergePatchPayload(cachedQoSPolicy, newQoSPolicy) + if err != nil { + klog.Errorf("failed to generate patch payload for qos '%s', %v", cachedQoSPolicy.Name, err) + return err + } + if _, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Patch(context.Background(), cachedQoSPolicy.Name, + types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to add finalizer for qos '%s', %v", cachedQoSPolicy.Name, err) + return err + } + return nil +} diff --git a/pkg/controller/vpc_nat_gateway.go b/pkg/controller/vpc_nat_gateway.go index 2ea8cc35825..befe37995c9 100644 --- a/pkg/controller/vpc_nat_gateway.go +++ b/pkg/controller/vpc_nat_gateway.go @@ -40,6 +40,10 @@ const ( natGwDnatDel = "dnat-del" natGwSnatAdd = "snat-add" natGwSnatDel = "snat-del" + natGwEipIngressQoSAdd = "eip-ingress-qos-add" + natGwEipIngressQoSDel = "eip-ingress-qos-del" + natGwEipEgressQoSAdd = "eip-egress-qos-add" + natGwEipEgressQoSDel = "eip-egress-qos-del" natGwSubnetFipAdd = "floating-ip-add" natGwSubnetFipDel = "floating-ip-del" natGwSubnetRouteAdd = "subnet-route-add" @@ -391,7 +395,7 @@ func (c *Controller) handleUpdateVpcEip(natGwKey string) error { for _, eip := range eips.Items { if eip.Spec.NatGwDp == natGwKey && eip.Status.Redo != NAT_GW_CREATED_AT && eip.Annotations[util.VpcNatAnnotation] == "" { klog.V(3).Infof("redo eip %s", eip.Name) - if err = c.patchEipStatus(eip.Name, "", NAT_GW_CREATED_AT, "", false); err != nil { + if err = c.patchEipStatus(eip.Name, "", NAT_GW_CREATED_AT, "", "", false); err != nil { klog.Errorf("failed to update eip '%s' to make sure applied, %v", eip.Name, err) return err } diff --git a/pkg/controller/vpc_nat_gw_eip.go b/pkg/controller/vpc_nat_gw_eip.go index ee2a611eb5d..39a0506d259 100644 --- a/pkg/controller/vpc_nat_gw_eip.go +++ b/pkg/controller/vpc_nat_gw_eip.go @@ -3,6 +3,9 @@ package controller import ( "context" "fmt" + "net" + "strings" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -10,9 +13,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "net" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "strings" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" "github.com/kubeovn/kube-ovn/pkg/ovs" @@ -45,7 +46,8 @@ func (c *Controller) enqueueUpdateIptablesEip(old, new interface{}) { oldEip := old.(*kubeovnv1.IptablesEIP) newEip := new.(*kubeovnv1.IptablesEIP) if !newEip.DeletionTimestamp.IsZero() || - oldEip.Status.Redo != newEip.Status.Redo { + oldEip.Status.Redo != newEip.Status.Redo || + oldEip.Spec.QoSPolicy != newEip.Spec.QoSPolicy { c.updateIptablesEipQueue.Add(key) } c.updateSubnetStatusQueue.Add(util.VpcExternalNet) @@ -249,7 +251,14 @@ func (c *Controller) handleAddIptablesEip(key string) error { klog.Errorf("failed to create eip '%s' in pod, %v", key, err) return err } - if err = c.createOrUpdateCrdEip(key, v4ip, v6ip, mac, cachedEip.Spec.NatGwDp); err != nil { + + if cachedEip.Spec.QoSPolicy != "" { + if err = c.addEipQoS(cachedEip, cachedEip.Status.IP); err != nil { + klog.Errorf("failed to add qos '%s' in pod, %v", key, err) + return err + } + } + if err = c.createOrUpdateCrdEip(key, v4ip, v6ip, mac, cachedEip.Spec.NatGwDp, cachedEip.Spec.QoSPolicy); err != nil { klog.Errorf("failed to update eip %s, %v", key, err) return err } @@ -342,6 +351,12 @@ func (c *Controller) handleUpdateIptablesEip(key string) error { return err } } + if cachedEip.Status.QoSPolicy != "" { + if err = c.delEipQoS(cachedEip, cachedEip.Status.IP); err != nil { + klog.Errorf("failed to del qos '%s' in pod, %v", key, err) + return err + } + } if err = c.handleDelIptablesEipFinalizer(key); err != nil { klog.Errorf("failed to handle del finalizer for eip %s, %v", key, err) return err @@ -361,6 +376,33 @@ func (c *Controller) handleUpdateIptablesEip(key string) error { klog.Error(err) return err } + + // update qos + if cachedEip.Status.QoSPolicy != cachedEip.Spec.QoSPolicy { + if cachedEip.Status.QoSPolicy != "" { + if err = c.delEipQoS(cachedEip, cachedEip.Status.IP); err != nil { + klog.Errorf("failed to del qos '%s' in pod, %v", key, err) + return err + } + } + if cachedEip.Spec.QoSPolicy != "" { + if err = c.addEipQoS(cachedEip, cachedEip.Status.IP); err != nil { + klog.Errorf("failed to add qos '%s' in pod, %v", key, err) + return err + } + } + + if err = c.qosLabelEIP(key, cachedEip.Spec.QoSPolicy); err != nil { + klog.Errorf("failed to label qos in eip, %v", err) + return err + } + + if err = c.patchEipQoSStatus(key, cachedEip.Spec.QoSPolicy); err != nil { + klog.Errorf("failed to patch status for eip %s, %v", key, err) + return err + } + } + // redo if !cachedEip.Status.Ready && cachedEip.Status.Redo != "" && @@ -380,7 +422,15 @@ func (c *Controller) handleUpdateIptablesEip(key string) error { klog.Errorf("failed to create eip, %v", err) return err } - if err = c.patchEipStatus(key, "", "", "", true); err != nil { + + if cachedEip.Spec.QoSPolicy != "" { + if err = c.addEipQoS(cachedEip, cachedEip.Status.IP); err != nil { + klog.Errorf("failed to add qos '%s' in pod, %v", key, err) + return err + } + } + + if err = c.patchEipStatus(key, "", "", "", cachedEip.Spec.QoSPolicy, true); err != nil { klog.Errorf("failed to patch status for eip %s, %v", key, err) return err } @@ -442,6 +492,100 @@ func (c *Controller) deleteEipInPod(dp, v4Cidr string) error { return nil } +// add tc rule for eip in nat gw pod +func (c *Controller) addEipQoS(eip *kubeovnv1.IptablesEIP, v4ip string) error { + var err error + + qosPolicy, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Get(context.Background(), eip.Spec.QoSPolicy, metav1.GetOptions{}) + + if err != nil { + klog.Errorf("get qos policy %s failed: %v", eip.Spec.QoSPolicy, err) + return err + } + + ingressRate := qosPolicy.Spec.BandwidthLimitRule.IngressMax + egressRate := qosPolicy.Spec.BandwidthLimitRule.EgressMax + // set ingress qos + if ingressRate != "" { + if err = c.addEipQoSInPod(eip.Spec.NatGwDp, v4ip, util.QoSDirectionIngress, ingressRate); err != nil { + klog.Errorf("failed to set ingress eip '%s' qos in pod, %v", eip.Name, err) + return err + } + } + + // set egress qos + if egressRate != "" { + if err = c.addEipQoSInPod(eip.Spec.NatGwDp, v4ip, util.QoSDirectionEgress, egressRate); err != nil { + klog.Errorf("failed to set egress eip '%s' qos in pod, %v", eip.Name, err) + return err + } + } + return nil +} + +// del tc rule for eip in nat gw pod +func (c *Controller) delEipQoS(eip *kubeovnv1.IptablesEIP, v4ip string) error { + var err error + + // del ingress qos + if err = c.delEipQoSInPod(eip.Spec.NatGwDp, v4ip, util.QoSDirectionIngress); err != nil { + klog.Errorf("failed to del ingress eip '%s' qos in pod, %v", eip.Name, err) + return err + } + + // del egress qos + if err = c.delEipQoSInPod(eip.Spec.NatGwDp, v4ip, util.QoSDirectionEgress); err != nil { + klog.Errorf("failed to del egress eip '%s' qos in pod, %v", eip.Name, err) + return err + } + return nil +} + +func (c *Controller) addEipQoSInPod(dp, v4ip, direction, rate string) error { + var operation string + gwPod, err := c.getNatGwPod(dp) + if err != nil { + return err + } + var addRules []string + rule := fmt.Sprintf("%s,%s", v4ip, rate) + addRules = append(addRules, rule) + + switch direction { + case util.QoSDirectionIngress: + operation = natGwEipIngressQoSAdd + case util.QoSDirectionEgress: + operation = natGwEipEgressQoSAdd + } + + if err = c.execNatGwRules(gwPod, operation, addRules); err != nil { + return err + } + return nil +} + +func (c *Controller) delEipQoSInPod(dp, v4ip, direction string) error { + var operation string + gwPod, err := c.getNatGwPod(dp) + if err != nil { + return err + } + var addRules []string + addRules = append(addRules, v4ip) + + switch direction { + case util.QoSDirectionIngress: + operation = natGwEipIngressQoSDel + case util.QoSDirectionEgress: + operation = natGwEipEgressQoSDel + } + + if err = c.execNatGwRules(gwPod, operation, addRules); err != nil { + return err + } + return nil +} + func (c *Controller) acquireStaticEip(name, namespace, nicName, ip string) (string, string, string, error) { checkConflict := true var v4ip, v6ip, mac string @@ -512,7 +656,7 @@ func (c *Controller) GetGwBySubnet(name string) (string, string, error) { } } -func (c *Controller) createOrUpdateCrdEip(key, v4ip, v6ip, mac, natGwDp string) error { +func (c *Controller) createOrUpdateCrdEip(key, v4ip, v6ip, mac, natGwDp, qos string) error { cachedEip, err := c.iptablesEipsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { @@ -562,6 +706,7 @@ func (c *Controller) createOrUpdateCrdEip(key, v4ip, v6ip, mac, natGwDp string) // TODO:// ipv6 } eip.Status.Ready = true + eip.Status.QoSPolicy = qos bytes, err := eip.Status.Bytes() if err != nil { return err @@ -590,6 +735,9 @@ func (c *Controller) createOrUpdateCrdEip(key, v4ip, v6ip, mac, natGwDp string) eip.Labels[util.VpcNatGatewayNameLabel] = natGwDp needUpdateLabel = true } + if eip.Spec.QoSPolicy != "" && eip.Labels[util.QoSLabel] != eip.Spec.QoSPolicy { + eip.Labels[util.QoSLabel] = eip.Spec.QoSPolicy + } if needUpdateLabel { if err := c.updateIptableLabels(eip.Name, op, "eip", eip.Labels); err != nil { return err @@ -697,7 +845,41 @@ func (c *Controller) patchEipIP(key, v4ip string) error { return nil } -func (c *Controller) patchEipStatus(key, v4ip, redo, nat string, ready bool) error { +func (c *Controller) patchEipQoSStatus(key, qos string) error { + var changed bool + oriEip, err := c.iptablesEipsLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + eip := oriEip.DeepCopy() + + // update status.qosPolicy + if eip.Status.QoSPolicy != qos { + eip.Status.QoSPolicy = qos + changed = true + } + + if changed { + bytes, err := eip.Status.Bytes() + if err != nil { + return err + } + if _, err = c.config.KubeOvnClient.KubeovnV1().IptablesEIPs().Patch(context.Background(), key, types.MergePatchType, + bytes, metav1.PatchOptions{}, "status"); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to patch eip %s, %v", eip.Name, err) + return err + } + } + return nil +} + +func (c *Controller) patchEipStatus(key, v4ip, redo, nat, qos string, ready bool) error { oriEip, err := c.iptablesEipsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { @@ -726,6 +908,11 @@ func (c *Controller) patchEipStatus(key, v4ip, redo, nat string, ready bool) err changed = true } + if qos != "" && eip.Status.QoSPolicy != qos { + eip.Status.QoSPolicy = qos + changed = true + } + if changed { bytes, err := eip.Status.Bytes() if err != nil { @@ -846,3 +1033,34 @@ func (c *Controller) natLabelEip(eipName, natName string) error { } return err } + +func (c *Controller) qosLabelEIP(eipName, qosName string) error { + oriEip, err := c.iptablesEipsLister.Get(eipName) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + eip := oriEip.DeepCopy() + var needUpdateLabel bool + var op string + if len(eip.Labels) == 0 { + op = "add" + needUpdateLabel = true + eip.Labels = map[string]string{ + util.QoSLabel: qosName, + } + } else if eip.Labels[util.QoSLabel] != qosName { + op = "replace" + needUpdateLabel = true + eip.Labels[util.QoSLabel] = qosName + } + if needUpdateLabel { + if err := c.updateIptableLabels(eip.Name, op, "eip", eip.Labels); err != nil { + return err + } + } + + return err +} diff --git a/pkg/controller/vpc_nat_gw_nat.go b/pkg/controller/vpc_nat_gw_nat.go index 8edfbd84f2e..6baac334df3 100644 --- a/pkg/controller/vpc_nat_gw_nat.go +++ b/pkg/controller/vpc_nat_gw_nat.go @@ -1303,7 +1303,7 @@ func (c *Controller) redoFip(key, redo string, eipReady bool) error { } if redo != "" && redo != fip.Status.Redo { if !eipReady { - if err = c.patchEipStatus(fip.Spec.EIP, "", redo, "", false); err != nil { + if err = c.patchEipStatus(fip.Spec.EIP, "", redo, "", "", false); err != nil { return err } } @@ -1431,7 +1431,7 @@ func (c *Controller) redoDnat(key, redo string, eipReady bool) error { } if redo != "" && redo != dnat.Status.Redo { if !eipReady { - if err = c.patchEipStatus(dnat.Spec.EIP, "", redo, "", false); err != nil { + if err = c.patchEipStatus(dnat.Spec.EIP, "", redo, "", "", false); err != nil { return err } } @@ -1541,7 +1541,7 @@ func (c *Controller) redoSnat(key, redo string, eipReady bool) error { } if redo != "" && redo != snat.Status.Redo { if !eipReady { - if err = c.patchEipStatus(snat.Spec.EIP, "", redo, "", false); err != nil { + if err = c.patchEipStatus(snat.Spec.EIP, "", redo, "", "", false); err != nil { return err } } diff --git a/pkg/util/const.go b/pkg/util/const.go index 9928de1a78d..95915fc6341 100644 --- a/pkg/util/const.go +++ b/pkg/util/const.go @@ -111,6 +111,7 @@ const ( VpcNatGatewayNameLabel = "ovn.kubernetes.io/vpc-nat-gw-name" VpcLbLabel = "ovn.kubernetes.io/vpc_lb" VpcDnsNameLabel = "ovn.kubernetes.io/vpc-dns" + QoSLabel = "ovn.kubernetes.io/qos" NetworkPolicyLogAnnotation = "ovn.kubernetes.io/enable_log" ProtocolTCP = "tcp" @@ -235,4 +236,7 @@ const ( DefaultServiceSessionStickinessTimeout = 10800 OvnSubnetGatewayIptables = "ovn-subnet-gateway" + + QoSDirectionIngress = "ingress" + QoSDirectionEgress = "egress" ) diff --git a/yamls/crd.yaml b/yamls/crd.yaml index 385c8b957e0..9c8c67c3a21 100644 --- a/yamls/crd.yaml +++ b/yamls/crd.yaml @@ -249,6 +249,8 @@ spec: type: string redo: type: string + qosPolicy: + type: string conditions: type: array items: @@ -277,6 +279,8 @@ spec: type: string natGwDp: type: string + qosPolicy: + type: string --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -1572,3 +1576,72 @@ spec: status: {} conversion: strategy: None +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: qos-policies.kubeovn.io +spec: + group: kubeovn.io + names: + plural: qos-policies + singular: qos-policy + shortNames: + - qos + kind: QoSPolicy + listKind: QoSPolicyList + scope: Cluster + versions: + - name: v1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - jsonPath: .spec.bandwidthLimitRule.ingressMax + name: IngressMax + type: string + - jsonPath: .spec.bandwidthLimitRule.egressMax + name: EgressMax + type: string + schema: + openAPIV3Schema: + type: object + properties: + status: + type: object + properties: + bandwidthLimitRule: + type: object + properties: + ingressMax: + type: string + egressMax: + type: string + conditions: + type: array + items: + type: object + properties: + type: + type: string + status: + type: string + reason: + type: string + message: + type: string + lastUpdateTime: + type: string + lastTransitionTime: + type: string + spec: + type: object + properties: + bandwidthLimitRule: + type: object + properties: + ingressMax: + type: string + egressMax: + type: string diff --git a/yamls/ovn-dpdk.yaml b/yamls/ovn-dpdk.yaml index 1a5f05cb590..2622c57d076 100644 --- a/yamls/ovn-dpdk.yaml +++ b/yamls/ovn-dpdk.yaml @@ -41,6 +41,8 @@ rules: - switch-lb-rules/status - vpc-dnses - vpc-dnses/status + - qos-policies + - qos-policies/status verbs: - "*" - apiGroups: diff --git a/yamls/ovn-ha.yaml b/yamls/ovn-ha.yaml index 76491350701..a9b0a27692c 100644 --- a/yamls/ovn-ha.yaml +++ b/yamls/ovn-ha.yaml @@ -40,6 +40,8 @@ rules: - switch-lb-rules/status - vpc-dnses - vpc-dnses/status + - qos-policies + - qos-policies/status verbs: - "*" - apiGroups: From a92eb9eea989579478fcccb9b1e604030a52bd83 Mon Sep 17 00:00:00 2001 From: shane Date: Mon, 3 Apr 2023 02:01:11 +0000 Subject: [PATCH 2/5] delete filter rule by filterID instead of filtertype --- dist/images/vpcnatgateway/nat-gateway.sh | 31 ++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/dist/images/vpcnatgateway/nat-gateway.sh b/dist/images/vpcnatgateway/nat-gateway.sh index 4b490cc357c..91adaf42436 100644 --- a/dist/images/vpcnatgateway/nat-gateway.sh +++ b/dist/images/vpcnatgateway/nat-gateway.sh @@ -285,7 +285,25 @@ function eip_ingress_qos_del() { qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip if [ "$?" -eq 0 ];then - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip dst $v4ip" + # get filterID(800::800) via IP(x.x.x.x) in the following text: + # filter protocol ip u32 chain 0 + # filter protocol ip u32 chain 0 fh 800: ht divisor 1 + # filter protocol ip u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 *flowid :1 not_in_hw + # match IP src x.x.x.x/32 + # police 0x1 rate 1Mbit burst 1Mb mtu 2Kb action drop overhead 0b linklayer ethernet + # ref 1 bind 1 installed 392 sec used 392 sec firstused 18818153 sec + + # Sent 0 bytes 0 pkts (dropped 0, overlimits 0) + ipList=$(tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep "match IP dst" | awk '{print $4}') + i=0 + for line in $ipList; do + i=$((i+1)) + if echo "$line" | grep $v4ip; then + break + fi + done + filterID=$(tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep "filter protocol ip u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $8}' | sed -n $i"p") + exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 handle $filterID u32" fi done } @@ -299,7 +317,16 @@ function eip_egress_qos_del() { qdisc_id="1:0" tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip if [ "$?" -eq 0 ];then - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip src $v4ip" + ipList=$(tc -p -s -d filter show dev net1 parent 1: prio 1 | grep "match IP src" | awk '{print $4}') + i=0 + for line in $ipList; do + i=$((i+1)) + if echo "$line" | grep $v4ip; then + break + fi + done + filterID=$(tc -p -s -d filter show dev net1 parent 1: prio 1 | grep "filter protocol ip u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $8}' | sed -n $i"p") + exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 handle $filterID u32" fi done } From 667e57d7319d2cf8eec8da119898d243b0eb2107 Mon Sep 17 00:00:00 2001 From: shane Date: Mon, 3 Apr 2023 03:23:51 +0000 Subject: [PATCH 3/5] refactor delete filter logic --- dist/images/vpcnatgateway/nat-gateway.sh | 81 ++++++++++++------------ 1 file changed, 40 insertions(+), 41 deletions(-) diff --git a/dist/images/vpcnatgateway/nat-gateway.sh b/dist/images/vpcnatgateway/nat-gateway.sh index 91adaf42436..c422a7b69ee 100644 --- a/dist/images/vpcnatgateway/nat-gateway.sh +++ b/dist/images/vpcnatgateway/nat-gateway.sh @@ -241,21 +241,52 @@ function del_dnat() { } +# example usage: +# delete_tc_filter "1:0" "192.168.1.1" "src" +function delete_tc_filter() { + qdisc_id=$1 + v4ip=$2 + direction=$3 + + # tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 + # output like this: + # filter protocol ip u32 chain 0 + # filter protocol ip u32 chain 0 fh 800: ht divisor 1 + # filter protocol ip u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 *flowid :1 not_in_hw + # match EIP dst x.x.x.1/32 + # police 0x1 rate 1Mbit burst 1Mb mtu 2Kb action drop overhead 0b linklayer ethernet + # ref 1 bind 1 installed 392 sec used 392 sec firstused 18818153 sec + # Sent 0 bytes 0 pkts (dropped 0, overlimits 0) + + # get the corresponding filterID by the EIP, and use the filterID to delete the corresponding filtering rule. + ipList=$(tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep "match IP $direction" | awk '{print $4}') + i=0 + for line in $ipList; do + i=$((i+1)) + if echo "$line" | grep $v4ip; then + filterID=$(tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep "filter protocol ip u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $8}' | sed -n $i"p") + exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 handle $filterID u32" + break + fi + done +} + function eip_ingress_qos_add() { for rule in $@ do arr=(${rule//,/ }) v4ip=(${arr[0]//\// }) rate=${arr[1]} + direction="dst" tc qdisc add dev net1 ingress 2>/dev/nul || true # get qdisc id qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') # del old filter tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip if [ "$?" -eq 0 ];then - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip dst $v4ip" + delete_tc_filter $qdisc_id $v4ip $direction fi - exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip dst $v4ip police rate "$rate"Mbit burst "$rate"Mb drop flowid :1" + exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip $direction $v4ip police rate "$rate"Mbit burst "$rate"Mb drop flowid :1" done } @@ -266,13 +297,14 @@ function eip_egress_qos_add() { v4ip=(${arr[0]//\// }) rate=${arr[1]} qdisc_id="1:0" + direction="src" tc qdisc add dev net1 root handle $qdisc_id htb 2>/dev/nul || true # del old filter tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip if [ "$?" -eq 0 ];then - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip src $v4ip" + delete_tc_filter $qdisc_id $v4ip $direction fi - exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip src $v4ip police rate "$rate"Mbit burst "$rate"Mb drop flowid :1" + exec_cmd "tc filter add dev net1 parent $qdisc_id protocol ip prio 1 u32 match ip $direction $v4ip police rate "$rate"Mbit burst "$rate"Mb drop flowid :1" done } @@ -281,30 +313,9 @@ function eip_ingress_qos_del() { do arr=(${rule//,/ }) v4ip=(${arr[0]//\// }) - + direction="dst" qdisc_id=$(tc qdisc show dev net1 ingress | awk '{print $3}') - tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip - if [ "$?" -eq 0 ];then - # get filterID(800::800) via IP(x.x.x.x) in the following text: - # filter protocol ip u32 chain 0 - # filter protocol ip u32 chain 0 fh 800: ht divisor 1 - # filter protocol ip u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 *flowid :1 not_in_hw - # match IP src x.x.x.x/32 - # police 0x1 rate 1Mbit burst 1Mb mtu 2Kb action drop overhead 0b linklayer ethernet - # ref 1 bind 1 installed 392 sec used 392 sec firstused 18818153 sec - - # Sent 0 bytes 0 pkts (dropped 0, overlimits 0) - ipList=$(tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep "match IP dst" | awk '{print $4}') - i=0 - for line in $ipList; do - i=$((i+1)) - if echo "$line" | grep $v4ip; then - break - fi - done - filterID=$(tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep "filter protocol ip u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $8}' | sed -n $i"p") - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 handle $filterID u32" - fi + delete_tc_filter $qdisc_id $v4ip $direction done } @@ -313,21 +324,9 @@ function eip_egress_qos_del() { do arr=(${rule//,/ }) v4ip=(${arr[0]//\// }) - + direction="src" qdisc_id="1:0" - tc -p -s -d filter show dev net1 parent $qdisc_id prio 1 | grep -w $v4ip - if [ "$?" -eq 0 ];then - ipList=$(tc -p -s -d filter show dev net1 parent 1: prio 1 | grep "match IP src" | awk '{print $4}') - i=0 - for line in $ipList; do - i=$((i+1)) - if echo "$line" | grep $v4ip; then - break - fi - done - filterID=$(tc -p -s -d filter show dev net1 parent 1: prio 1 | grep "filter protocol ip u32 \(fh\|chain [0-9]\+ fh\) \(\w\+::\w\+\) *" | awk '{print $8}' | sed -n $i"p") - exec_cmd "tc filter del dev net1 parent $qdisc_id protocol ip prio 1 handle $filterID u32" - fi + delete_tc_filter $qdisc_id $v4ip $direction done } From cea63884c39afbe1b1c05922011c2c09243a189d Mon Sep 17 00:00:00 2001 From: shane Date: Wed, 5 Apr 2023 13:31:11 +0000 Subject: [PATCH 4/5] add eip-qos doc --- docs/eip-qos.md | 50 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 docs/eip-qos.md diff --git a/docs/eip-qos.md b/docs/eip-qos.md new file mode 100644 index 00000000000..83c30259a98 --- /dev/null +++ b/docs/eip-qos.md @@ -0,0 +1,50 @@ +# EIP QoS + +EIP QoS is a feature in Kube-OVN that allows for dynamic configuration of rate limits for both Ingress and Egress traffic on custom VPC EIPs. + +## Creating a QoS Policy + +To create a QoS Policy, use the following YAML configuration: + +``` +apiVersion: kubeovn.io/v1 +kind: QoSPolicy +metadata: + name: qos-example +spec: + bandwidthLimitRule: + ingressMax: "1" # Mbps + egressMax: "1" # Mbps +``` + +It is allowed to limit only one direction, just like this: + +``` +apiVersion: kubeovn.io/v1 +kind: QoSPolicy +metadata: + name: qos-example +spec: + bandwidthLimitRule: + ingressMax: "1" # Mbps +``` + +## Enabling EIP QoS +To enable EIP QoS, use the following YAML configuration: + +``` +kind: IptablesEIP +apiVersion: kubeovn.io/v1 +metadata: + name: eip-random +spec: + natGwDp: gw1 + qosPolicy: qos-example +``` + +You can also add or update the `.spec.qosPolicy` field to an existing EIP. + +## Limitations + +* After creating a QoS Policy, the bandwidth limit rules cannot be changed. If you need to set new rate limit rules for an EIP, you can update a new QoS Policy to the `IptablesEIP.spec.qosPolicy` field. +* You can only delete a QoS Policy if it is not currently in use. Therefore, before deleting a QoS Policy, you must first remove the `IptablesEIP.spec.qosPolicy` field from any associated IptablesEIP. From 0733b90347c4979b4f50ca608b887f885a45be25 Mon Sep 17 00:00:00 2001 From: shane Date: Mon, 10 Apr 2023 09:45:50 +0000 Subject: [PATCH 5/5] fix typo --- dist/images/vpcnatgateway/nat-gateway.sh | 2 +- pkg/controller/vpc_nat_gw_eip.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/dist/images/vpcnatgateway/nat-gateway.sh b/dist/images/vpcnatgateway/nat-gateway.sh index c422a7b69ee..1ea7fec5b67 100644 --- a/dist/images/vpcnatgateway/nat-gateway.sh +++ b/dist/images/vpcnatgateway/nat-gateway.sh @@ -253,7 +253,7 @@ function delete_tc_filter() { # filter protocol ip u32 chain 0 # filter protocol ip u32 chain 0 fh 800: ht divisor 1 # filter protocol ip u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 *flowid :1 not_in_hw - # match EIP dst x.x.x.1/32 + # match IP dst x.x.x.1/32 # police 0x1 rate 1Mbit burst 1Mb mtu 2Kb action drop overhead 0b linklayer ethernet # ref 1 bind 1 installed 392 sec used 392 sec firstused 18818153 sec # Sent 0 bytes 0 pkts (dropped 0, overlimits 0) diff --git a/pkg/controller/vpc_nat_gw_eip.go b/pkg/controller/vpc_nat_gw_eip.go index 39a0506d259..55391a5ecf0 100644 --- a/pkg/controller/vpc_nat_gw_eip.go +++ b/pkg/controller/vpc_nat_gw_eip.go @@ -495,9 +495,7 @@ func (c *Controller) deleteEipInPod(dp, v4Cidr string) error { // add tc rule for eip in nat gw pod func (c *Controller) addEipQoS(eip *kubeovnv1.IptablesEIP, v4ip string) error { var err error - qosPolicy, err := c.config.KubeOvnClient.KubeovnV1().QoSPolicies().Get(context.Background(), eip.Spec.QoSPolicy, metav1.GetOptions{}) - if err != nil { klog.Errorf("get qos policy %s failed: %v", eip.Spec.QoSPolicy, err) return err @@ -570,8 +568,8 @@ func (c *Controller) delEipQoSInPod(dp, v4ip, direction string) error { if err != nil { return err } - var addRules []string - addRules = append(addRules, v4ip) + var delRules []string + delRules = append(delRules, v4ip) switch direction { case util.QoSDirectionIngress: @@ -580,7 +578,7 @@ func (c *Controller) delEipQoSInPod(dp, v4ip, direction string) error { operation = natGwEipEgressQoSDel } - if err = c.execNatGwRules(gwPod, operation, addRules); err != nil { + if err = c.execNatGwRules(gwPod, operation, delRules); err != nil { return err } return nil