Skip to content

Commit

Permalink
Change to use weak reference
Browse files Browse the repository at this point in the history
Use a channel for serviceGroupID update/delete event between ofClient and agent/NPController to avoid race condition.

Signed-off-by: wgrayson <wgrayson@vmware.com>
  • Loading branch information
GraysonWu committed Sep 30, 2021
1 parent 09f2ac1 commit 195d2ee
Show file tree
Hide file tree
Showing 20 changed files with 104 additions and 283 deletions.
21 changes: 0 additions & 21 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4756,27 +4756,6 @@ webhooks:
scope: Cluster
sideEffects: None
timeoutSeconds: 5
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: antrea
namespace: kube-system
path: /validate/service
name: servicevalidator.antrea.io
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- DELETE
resources:
- services
scope: Namespaced
sideEffects: None
timeoutSeconds: 5
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down
21 changes: 0 additions & 21 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4758,27 +4758,6 @@ webhooks:
scope: Cluster
sideEffects: None
timeoutSeconds: 5
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: antrea
namespace: kube-system
path: /validate/service
name: servicevalidator.antrea.io
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- DELETE
resources:
- services
scope: Namespaced
sideEffects: None
timeoutSeconds: 5
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down
21 changes: 0 additions & 21 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4756,27 +4756,6 @@ webhooks:
scope: Cluster
sideEffects: None
timeoutSeconds: 5
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: antrea
namespace: kube-system
path: /validate/service
name: servicevalidator.antrea.io
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- DELETE
resources:
- services
scope: Namespaced
sideEffects: None
timeoutSeconds: 5
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down
21 changes: 0 additions & 21 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4805,27 +4805,6 @@ webhooks:
scope: Cluster
sideEffects: None
timeoutSeconds: 5
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: antrea
namespace: kube-system
path: /validate/service
name: servicevalidator.antrea.io
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- DELETE
resources:
- services
scope: Namespaced
sideEffects: None
timeoutSeconds: 5
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down
21 changes: 0 additions & 21 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4761,27 +4761,6 @@ webhooks:
scope: Cluster
sideEffects: None
timeoutSeconds: 5
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: antrea
namespace: kube-system
path: /validate/service
name: servicevalidator.antrea.io
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- DELETE
resources:
- services
scope: Namespaced
sideEffects: None
timeoutSeconds: 5
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down
15 changes: 0 additions & 15 deletions build/yamls/base/controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,6 @@ webhooks:
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
timeoutSeconds: 5
- name: "servicevalidator.antrea.io"
clientConfig:
service:
name: "antrea"
namespace: "kube-system"
path: "/validate/service"
rules:
- operations: ["DELETE"]
apiGroups: [""]
apiVersions: ["v1"]
resources: ["services"]
scope: "Namespaced"
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
timeoutSeconds: 5
---
apiVersion: apiregistration.k8s.io/v1
kind: APIService
Expand Down
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ func run(o *Options) error {
ovsDatapathType := ovsconfig.OVSDatapathType(o.config.OVSDatapathType)
ovsBridgeClient := ovsconfig.NewOVSBridge(o.config.OVSBridge, ovsDatapathType, ovsdbConnection)
ovsBridgeMgmtAddr := ofconfig.GetMgmtAddress(o.config.OVSRunDir, o.config.OVSBridge)
serviceGroupIDUpdates := make(chan string, 100)
ofClient := openflow.NewClient(o.config.OVSBridge, ovsBridgeMgmtAddr, ovsDatapathType,
features.DefaultFeatureGate.Enabled(features.AntreaProxy),
features.DefaultFeatureGate.Enabled(features.AntreaPolicy),
features.DefaultFeatureGate.Enabled(features.Egress),
features.DefaultFeatureGate.Enabled(features.FlowExporter),
o.config.AntreaProxy.ProxyAll)
o.config.AntreaProxy.ProxyAll,
serviceGroupIDUpdates)

_, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR)
var serviceCIDRNetv6 *net.IPNet
Expand Down Expand Up @@ -241,6 +243,7 @@ func run(o *Options) error {
ifaceStore,
nodeConfig.Name,
entityUpdates,
serviceGroupIDUpdates,
antreaPolicyEnabled,
statusManagerEnabled,
loggingEnabled,
Expand Down
1 change: 0 additions & 1 deletion cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ var allowedPaths = []string{
"/validate/clustergroup",
"/validate/externalippool",
"/validate/egress",
"/validate/service",
"/convert/clustergroup",
}

Expand Down
54 changes: 46 additions & 8 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (
v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/k8s"
)

const (
RuleIDLength = 16
appliedToGroupIndex = "appliedToGroup"
addressGroupIndex = "addressGroup"
policyIndex = "policy"
toServicesIndex = "toServices"
)

// rule is the struct stored in ruleCache, it contains necessary information
Expand Down Expand Up @@ -156,6 +158,10 @@ type ruleCache struct {

// entityUpdates is a channel for receiving entity (e.g. Pod) updates from CNIServer.
entityUpdates <-chan antreatypes.EntityReference

// serviceGroupIDUpdates is a channel for receiving groupID for Service is assigned
// or released event from ofClient.
serviceGroupIDUpdates <-chan string
}

func (c *ruleCache) getNetworkPolicies(npFilter *querier.NetworkPolicyQueryFilter) []v1beta.NetworkPolicy {
Expand Down Expand Up @@ -318,21 +324,35 @@ func policyIndexFunc(obj interface{}) ([]string, error) {
return []string{string(rule.PolicyUID)}, nil
}

// toServicesIndexFunc knows how to get NamespacedNames of Services referred in
// ToServices field of a *rule. It's provided to cache.Indexer to build an index of
// NetworkPolicy.
func toServicesIndexFunc(obj interface{}) ([]string, error) {
rule := obj.(*rule)
toSvcNamespacedName := sets.String{}
for _, svc := range rule.To.ToServices {
toSvcNamespacedName.Insert(k8s.NamespacedName(svc.Namespace, svc.Name))
}
return toSvcNamespacedName.List(), nil
}

// newRuleCache returns a new *ruleCache.
func newRuleCache(dirtyRuleHandler func(string), podUpdate <-chan antreatypes.EntityReference) *ruleCache {
func newRuleCache(dirtyRuleHandler func(string), podUpdate <-chan antreatypes.EntityReference, serviceGroupIDUpdate <-chan string) *ruleCache {
rules := cache.NewIndexer(
ruleKeyFunc,
cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc},
cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc, toServicesIndex: toServicesIndexFunc},
)
cache := &ruleCache{
appliedToSetByGroup: make(map[string]v1beta.GroupMemberSet),
addressSetByGroup: make(map[string]v1beta.GroupMemberSet),
policyMap: make(map[string]*v1beta.NetworkPolicy),
rules: rules,
dirtyRuleHandler: dirtyRuleHandler,
entityUpdates: podUpdate,
appliedToSetByGroup: make(map[string]v1beta.GroupMemberSet),
addressSetByGroup: make(map[string]v1beta.GroupMemberSet),
policyMap: make(map[string]*v1beta.NetworkPolicy),
rules: rules,
dirtyRuleHandler: dirtyRuleHandler,
entityUpdates: podUpdate,
serviceGroupIDUpdates: serviceGroupIDUpdate,
}
go cache.processEntityUpdates()
go cache.processServiceGroupIDUpdates()
return cache
}

Expand Down Expand Up @@ -364,6 +384,24 @@ func (c *ruleCache) processEntityUpdates() {
}
}

// processServiceGroupIDUpdates is an infinite loop that takes Service groupID
// update events from the channel, finds out rules that refer this Service in
// ToServices field and use dirtyRuleHandler to re-queue these rules.
func (c *ruleCache) processServiceGroupIDUpdates() {
for {
select {
case svcNamespacedName := <-c.serviceGroupIDUpdates:
toSvcRules, err := c.rules.ByIndex(toServicesIndex, svcNamespacedName)
if err != nil {
continue
}
for _, toSvcRule := range toSvcRules {
c.dirtyRuleHandler(toSvcRule.(*rule).ID)
}
}
}
}

// GetAddressGroupNum gets the number of AddressGroup.
func (c *ruleCache) GetAddressGroupNum() int {
c.addressSetLock.RLock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ func TestRuleCacheAddAddressGroup(t *testing.T) {
func newFakeRuleCache() (*ruleCache, *dirtyRuleRecorder, chan types.EntityReference) {
recorder := newDirtyRuleRecorder()
ch := make(chan types.EntityReference, 100)
c := newRuleCache(recorder.Record, ch)
ch2 := make(chan string, 100)
c := newRuleCache(recorder.Record, ch, ch2)
return c, recorder, ch
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
ifaceStore interfacestore.InterfaceStore,
nodeName string,
entityUpdates <-chan types.EntityReference,
serviceGroupIDUpdates <-chan string,
antreaPolicyEnabled bool,
statusManagerEnabled bool,
loggingEnabled bool,
Expand All @@ -136,7 +137,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
}
}
c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController)
c.ruleCache = newRuleCache(c.enqueueRule, entityUpdates)
c.ruleCache = newRuleCache(c.enqueueRule, entityUpdates, serviceGroupIDUpdates)
if statusManagerEnabled {
c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (g *antreaClientGetter) GetAntreaClient() (versioned.Interface, error) {
func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
clientset := &fake.Clientset{}
ch := make(chan agenttypes.EntityReference, 100)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch,
ch2 := make(chan string, 100)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, ch2,
true, true, true, nil, testAsyncDeleteInterval, "8.8.8.8:53")
reconciler := newMockReconciler()
controller.reconciler = reconciler
Expand Down
Loading

0 comments on commit 195d2ee

Please sign in to comment.