diff --git a/npc/controller.go b/npc/controller.go index fc363dfde4..304ca04a8e 100644 --- a/npc/controller.go +++ b/npc/controller.go @@ -59,7 +59,7 @@ func (npc *controller) onNewNsSelector(selector *selector) error { for _, ns := range npc.nss { if ns.namespace != nil { if selector.matches(ns.namespace.ObjectMeta.Labels) { - if err := selector.addEntry(string(ns.allPods.ipsetName), namespaceComment(ns)); err != nil { + if err := selector.addEntry(ns.namespace.ObjectMeta.UID, string(ns.allPods.ipsetName), namespaceComment(ns)); err != nil { return err } } diff --git a/npc/controller_test.go b/npc/controller_test.go index ca690ac161..4219bae2b6 100644 --- a/npc/controller_test.go +++ b/npc/controller_test.go @@ -10,13 +10,14 @@ import ( coreapi "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ) type mockSet struct { name ipset.Name setType ipset.Type - subSets map[string]bool + subSets map[string]map[types.UID]bool } type mockIPSet struct { @@ -24,70 +25,57 @@ type mockIPSet struct { } func newMockIPSet() mockIPSet { - i := mockIPSet{ - sets: make(map[string]mockSet), - } - - return i + return mockIPSet{sets: make(map[string]mockSet)} } func (i *mockIPSet) Create(ipsetName ipset.Name, ipsetType ipset.Type) error { if _, ok := i.sets[string(ipsetName)]; ok { return errors.Errorf("ipset %s already exists", ipsetName) } - i.sets[string(ipsetName)] = mockSet{name: ipsetName, setType: ipsetType, subSets: make(map[string]bool)} + i.sets[string(ipsetName)] = mockSet{name: ipsetName, setType: ipsetType, subSets: make(map[string]map[types.UID]bool)} return nil } -func (i *mockIPSet) AddEntry(ipsetName ipset.Name, entry string, comment string) error { - return i.addEntry(ipsetName, entry, comment, true) -} - -func (i *mockIPSet) AddEntryIfNotExist(ipsetName ipset.Name, entry string, comment string) error { - return i.addEntry(ipsetName, entry, comment, false) -} - -func (i *mockIPSet) addEntry(ipsetName ipset.Name, entry string, comment string, checkIfExists bool) error { - log.Printf("adding entry %s to %s", entry, ipsetName) +func (i *mockIPSet) AddEntry(user types.UID, ipsetName ipset.Name, entry string, comment string) error { + log.Printf("adding entry %s to %s for %s", entry, ipsetName, user) if _, ok := i.sets[string(ipsetName)]; !ok { - return errors.Errorf("ipset %s does not exist", entry) + return errors.Errorf("%s does not exist", entry) + } + if i.sets[string(ipsetName)].subSets[entry] == nil { + i.sets[string(ipsetName)].subSets[entry] = make(map[types.UID]bool) } - if checkIfExists { - if _, ok := i.sets[string(ipsetName)].subSets[entry]; ok { - return errors.Errorf("ipset %s is already a member of %s", entry, ipsetName) - } + if _, ok := i.sets[string(ipsetName)].subSets[entry][user]; ok { + return errors.Errorf("user %s already owns entry %s", user, entry) } - i.sets[string(ipsetName)].subSets[entry] = true + i.sets[string(ipsetName)].subSets[entry][user] = true return nil } -func (i *mockIPSet) DelEntry(ipsetName ipset.Name, entry string) error { - return i.delEntry(ipsetName, entry, true) -} - -func (i *mockIPSet) DelEntryIfExists(ipsetName ipset.Name, entry string) error { - return i.delEntry(ipsetName, entry, false) -} - -func (i *mockIPSet) delEntry(ipsetName ipset.Name, entry string, checkIfExists bool) error { - log.Printf("deleting entry %s from %s", entry, ipsetName) +func (i *mockIPSet) DelEntry(user types.UID, ipsetName ipset.Name, entry string) error { + log.Printf("deleting entry %s from %s for %s", entry, ipsetName, user) if _, ok := i.sets[string(ipsetName)]; !ok { return errors.Errorf("ipset %s does not exist", ipsetName) } - if checkIfExists { - if _, ok := i.sets[string(ipsetName)].subSets[entry]; !ok { - return errors.Errorf("ipset %s is not a member of %s", entry, ipsetName) - } + if _, ok := i.sets[string(ipsetName)].subSets[entry][user]; !ok { + return errors.Errorf("user %s does not own entry %s", user, entry) + } + delete(i.sets[string(ipsetName)].subSets[entry], user) + + if len(i.sets[string(ipsetName)].subSets[entry]) == 0 { + delete(i.sets[string(ipsetName)].subSets, entry) } - delete(i.sets[string(ipsetName)].subSets, entry) return nil } -func (i *mockIPSet) Exist(ipsetName ipset.Name, entry string) bool { - _, found := i.sets[string(ipsetName)].subSets[entry] - return found +func (i *mockIPSet) Exist(user types.UID, ipsetName ipset.Name, entry string) bool { + _, ok := i.sets[string(ipsetName)].subSets[entry][user] + return ok +} + +func (i *mockIPSet) entryExists(ipsetName ipset.Name, entry string) bool { + return len(i.sets[string(ipsetName)].subSets[entry]) > 0 } func (i *mockIPSet) Flush(ipsetName ipset.Name) error { @@ -191,7 +179,7 @@ func TestRegressionPolicyNamespaceOrdering3059(t *testing.T) { controller.AddNetworkPolicy(networkPolicy) - require.Equal(t, true, m.sets[selectorIPSetName].subSets[sourceIPSetName]) + require.Equal(t, true, len(m.sets[selectorIPSetName].subSets[sourceIPSetName]) > 0) // NetworkPolicy first m = newMockIPSet() @@ -202,8 +190,7 @@ func TestRegressionPolicyNamespaceOrdering3059(t *testing.T) { controller.AddNamespace(sourceNamespace) controller.AddNamespace(destinationNamespace) - require.Equal(t, true, m.sets[selectorIPSetName].subSets[sourceIPSetName]) - + require.Equal(t, true, len(m.sets[selectorIPSetName].subSets[sourceIPSetName]) > 0) } // Tests default-allow ipset behavior when running in non-legacy mode. @@ -238,7 +225,7 @@ func TestDefaultAllow(t *testing.T) { controller.AddPod(podFoo) // Should add the foo pod to default-allow - require.True(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + require.True(t, m.entryExists(defaultAllowIPSetName, fooPodIP)) podBar := &coreapi.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -252,7 +239,7 @@ func TestDefaultAllow(t *testing.T) { controller.UpdatePod(podBarNoIP, podBar) // Should add the bar pod to default-allow - require.True(t, m.Exist(defaultAllowIPSetName, barPodIP)) + require.True(t, m.entryExists(defaultAllowIPSetName, barPodIP)) // Allow access from the bar pod to the foo pod netpol := &networkingv1.NetworkPolicy{ @@ -274,37 +261,118 @@ func TestDefaultAllow(t *testing.T) { controller.AddNetworkPolicy(netpol) // Should remove the foo pod from default-allow as the netpol selects it - require.False(t, m.Exist(defaultAllowIPSetName, fooPodIP)) - require.True(t, m.Exist(defaultAllowIPSetName, barPodIP)) + require.False(t, m.entryExists(defaultAllowIPSetName, fooPodIP)) + require.True(t, m.entryExists(defaultAllowIPSetName, barPodIP)) podBarWithNewIP := *podBar podBarWithNewIP.Status.PodIP = barPodNewIP controller.UpdatePod(podBar, &podBarWithNewIP) // Should update IP addr of the bar pod in default-allow - require.False(t, m.Exist(defaultAllowIPSetName, barPodIP)) - require.True(t, m.Exist(defaultAllowIPSetName, barPodNewIP)) + require.False(t, m.entryExists(defaultAllowIPSetName, barPodIP)) + require.True(t, m.entryExists(defaultAllowIPSetName, barPodNewIP)) controller.UpdatePod(&podBarWithNewIP, podBarNoIP) // Should remove the bar pod from default-allow as it does not have any IP addr - require.False(t, m.Exist(defaultAllowIPSetName, barPodNewIP)) + require.False(t, m.entryExists(defaultAllowIPSetName, barPodNewIP)) podFooWithNewLabel := *podFoo podFooWithNewLabel.ObjectMeta.Labels = map[string]string{"run": "new-foo"} controller.UpdatePod(podFoo, &podFooWithNewLabel) // Should bring back the foo pod to default-allow as it does not match dst of any netpol - require.True(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + require.True(t, m.entryExists(defaultAllowIPSetName, fooPodIP)) controller.UpdatePod(&podFooWithNewLabel, podFoo) // Should remove from default-allow as it matches the netpol after the update - require.False(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + require.False(t, m.entryExists(defaultAllowIPSetName, fooPodIP)) controller.DeleteNetworkPolicy(netpol) // Should bring back the foo pod to default-allow as no netpol selects it - require.True(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + require.True(t, m.entryExists(defaultAllowIPSetName, fooPodIP)) controller.DeletePod(podFoo) // Should remove foo pod from default-allow - require.False(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + require.False(t, m.entryExists(defaultAllowIPSetName, fooPodIP)) +} + +func TestOutOfOrderPodEvents(t *testing.T) { + const ( + defaultAllowIPSetName = "weave-E.1.0W^NGSp]0_t5WwH/]gX@L" + runBarIPSetName = "weave-bZ~x=yBgzH)Ht()K*Uv3z{M]Y" + podIP = "10.32.0.10" + ) + + m := newMockIPSet() + controller := New("qux", false, &mockIPTables{}, &m) + + defaultNamespace := &coreapi.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + controller.AddNamespace(defaultNamespace) + + podFoo := &coreapi.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "foo", + Namespace: "default", + Name: "foo", + Labels: map[string]string{"run": "foo"}}, + Status: coreapi.PodStatus{PodIP: podIP}} + controller.AddPod(podFoo) + + // Should be in default-allow as no netpol selects podFoo + require.True(t, m.entryExists(defaultAllowIPSetName, podIP)) + + netpol := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "allow-from-bar-to-foo", + Namespace: "default", + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"run": "foo"}}, + Ingress: []networkingv1.NetworkPolicyIngressRule{{ + From: []networkingv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"run": "bar"}, + }, + }}, + }}, + }, + } + controller.AddNetworkPolicy(netpol) + + // Shouldn't be in default-allow as netpol above selects podFoo + require.False(t, m.entryExists(defaultAllowIPSetName, podIP)) + + podBar := &coreapi.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "bar", + Namespace: "default", + Name: "bar", + Labels: map[string]string{"run": "bar"}}, + Status: coreapi.PodStatus{PodIP: podIP}} + controller.AddPod(podBar) + + // Should be in default-allow as no netpol selects podBar + require.True(t, m.entryExists(defaultAllowIPSetName, podIP)) + require.True(t, m.Exist(podBar.ObjectMeta.UID, defaultAllowIPSetName, podIP)) + // Should be in run=bar ipset + require.True(t, m.entryExists(runBarIPSetName, podIP)) + + controller.DeletePod(podFoo) + // Multiple duplicate events should not affect npc state + controller.DeletePod(podFoo) + controller.DeletePod(podFoo) + + // Should be in default-allow as no netpol selects podBar and podFoo removal + // should not affect podBar in default-allow + require.True(t, m.entryExists(defaultAllowIPSetName, podIP)) + + controller.DeletePod(podBar) + + // Should remove from default-allow and run=bar ipsets + require.Equal(t, 0, len(m.sets[defaultAllowIPSetName].subSets)) + require.False(t, m.entryExists(runBarIPSetName, podIP)) } diff --git a/npc/ipset/ipset.go b/npc/ipset/ipset.go index 2a9b0d4e62..1e57b1d748 100644 --- a/npc/ipset/ipset.go +++ b/npc/ipset/ipset.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/types" ) type Name string @@ -19,11 +20,9 @@ const ( type Interface interface { Create(ipsetName Name, ipsetType Type) error - AddEntry(ipsetName Name, entry string, comment string) error - AddEntryIfNotExist(ipsetName Name, entry string, comment string) error - DelEntry(ipsetName Name, entry string) error - DelEntryIfExists(ipsetName Name, entry string) error - Exist(ipsetName Name, entry string) bool + AddEntry(user types.UID, ipsetName Name, entry string, comment string) error + DelEntry(user types.UID, ipsetName Name, entry string) error + Exist(user types.UID, ipsetName Name, entry string) bool Flush(ipsetName Name) error Destroy(ipsetName Name) error @@ -33,14 +32,27 @@ type Interface interface { DestroyAll() error } +type entryKey struct { + ipsetName Name + entry string +} + type ipset struct { - refCount *log.Logger enableComments bool + // List of users per ipset entry. User is either a namespace or a pod. + // There might be multiple users for the same ipset & entry pair because + // events from k8s API server might be out of order causing duplicate IPs: + // https://github.com/weaveworks/weave/issues/2792. + users map[entryKey]map[types.UID]struct{} } func New(logger *log.Logger) Interface { - ips := &ipset{refCount: newRefCount(), Logger: logger, enableComments: true} + ips := &ipset{ + Logger: logger, + enableComments: true, + users: make(map[entryKey]map[types.UID]struct{}), + } // Check for comment support testIpsetName := Name("weave-test-comment") @@ -65,11 +77,15 @@ func (i *ipset) Create(ipsetName Name, ipsetType Type) error { return doExec(args...) } -func (i *ipset) AddEntry(ipsetName Name, entry string, comment string) error { - i.Logger.Printf("adding entry %s to %s", entry, ipsetName) - if i.inc(ipsetName, entry) > 1 { // already in the set +func (i *ipset) AddEntry(user types.UID, ipsetName Name, entry string, comment string) error { + i.Logger.Printf("adding entry %s to %s of %s", entry, ipsetName, user) + + if !i.addUser(user, ipsetName, entry) { // already in the set return nil } + + i.Logger.Printf("added entry %s to %s of %s", entry, ipsetName, user) + args := []string{"add", string(ipsetName), entry} if i.enableComments { args = append(args, "comment", comment) @@ -77,53 +93,39 @@ func (i *ipset) AddEntry(ipsetName Name, entry string, comment string) error { return doExec(args...) } -// AddEntryIfNotExist does the same as AddEntry but bypasses the ref counting. -// Should be used only with "default-allow" ipsets. -func (i *ipset) AddEntryIfNotExist(ipsetName Name, entry string, comment string) error { - if i.count(ipsetName, entry) == 1 { - return nil - } - return i.AddEntry(ipsetName, entry, comment) -} +func (i *ipset) DelEntry(user types.UID, ipsetName Name, entry string) error { + i.Logger.Printf("deleting entry %s from %s of %s", entry, ipsetName, user) -func (i *ipset) DelEntry(ipsetName Name, entry string) error { - i.Logger.Printf("deleting entry %s from %s", entry, ipsetName) - if i.dec(ipsetName, entry) > 0 { // still needed + if !i.delUser(user, ipsetName, entry) { // still needed return nil } - return doExec("del", string(ipsetName), entry) -} -// DelEntryIfExists does the same as DelEntry but bypasses the ref counting. -// Should be used only with "default-allow" ipsets. -func (i *ipset) DelEntryIfExists(ipsetName Name, entry string) error { - if i.count(ipsetName, entry) == 0 { - return nil - } - return i.DelEntry(ipsetName, entry) + i.Logger.Printf("deleted entry %s from %s of %s", entry, ipsetName, user) + + return doExec("del", string(ipsetName), entry) } -func (i *ipset) Exist(ipsetName Name, entry string) bool { - return i.count(ipsetName, entry) > 0 +func (i *ipset) Exist(user types.UID, ipsetName Name, entry string) bool { + return i.existUser(user, ipsetName, entry) } func (i *ipset) Flush(ipsetName Name) error { - i.removeSet(ipsetName) + i.removeSetFromUsers(ipsetName) return doExec("flush", string(ipsetName)) } func (i *ipset) FlushAll() error { - i.refCount = newRefCount() + i.users = make(map[entryKey]map[types.UID]struct{}) return doExec("flush") } func (i *ipset) Destroy(ipsetName Name) error { - i.removeSet(ipsetName) + i.removeSetFromUsers(ipsetName) return doExec("destroy", string(ipsetName)) } func (i *ipset) DestroyAll() error { - i.refCount = newRefCount() + i.users = make(map[entryKey]map[types.UID]struct{}) return doExec("destroy") } @@ -145,48 +147,52 @@ func (i *ipset) List(prefix string) ([]Name, error) { return selected, err } -func doExec(args ...string) error { - if output, err := exec.Command("ipset", args...).CombinedOutput(); err != nil { - return errors.Wrapf(err, "ipset %v failed: %s", args, output) +// Returns true if entry does not exist in ipset (entry has to be inserted into ipset). +func (i *ipset) addUser(user types.UID, ipsetName Name, entry string) bool { + k := entryKey{ipsetName, entry} + add := false + + if i.users[k] == nil { + i.users[k] = make(map[types.UID]struct{}) } - return nil -} + if len(i.users[k]) == 0 { + add = true + } + i.users[k][user] = struct{}{} -// Reference-counting -type key struct { - ipsetName Name - entry string + return add } -// note no locking is required as all operations are serialised in the controller -type refCount struct { - ref map[key]int -} +// Returns true if user is the last owner of entry (entry has to be removed from ipset). +func (i *ipset) delUser(user types.UID, ipsetName Name, entry string) bool { + k := entryKey{ipsetName, entry} -func newRefCount() refCount { - return refCount{ref: make(map[key]int)} -} + oneLeft := len(i.users[k]) == 1 + delete(i.users[k], user) -func (rc *refCount) inc(ipsetName Name, entry string) int { - k := key{ipsetName, entry} - rc.ref[k]++ - return rc.ref[k] -} + if len(i.users[k]) == 0 { + delete(i.users, k) + } -func (rc *refCount) dec(ipsetName Name, entry string) int { - k := key{ipsetName, entry} - rc.ref[k]-- - return rc.ref[k] + return oneLeft && (len(i.users[k]) == 0) } -func (rc *refCount) count(ipsetName Name, entry string) int { - return rc.ref[key{ipsetName, entry}] +func (i *ipset) existUser(user types.UID, ipsetName Name, entry string) bool { + _, ok := i.users[entryKey{ipsetName, entry}][user] + return ok } -func (rc *refCount) removeSet(ipsetName Name) { - for k := range rc.ref { +func (i *ipset) removeSetFromUsers(ipsetName Name) { + for k := range i.users { if k.ipsetName == ipsetName { - delete(rc.ref, k) + delete(i.users, k) } } } + +func doExec(args ...string) error { + if output, err := exec.Command("ipset", args...).CombinedOutput(); err != nil { + return errors.Wrapf(err, "ipset %v failed: %s", args, output) + } + return nil +} diff --git a/npc/namespace.go b/npc/namespace.go index 85c893e09b..09da38f8b3 100644 --- a/npc/namespace.go +++ b/npc/namespace.go @@ -92,7 +92,7 @@ func (ns *ns) onNewPodSelector(selector *selector) error { for _, pod := range ns.pods { if hasIP(pod) { if selector.matches(pod.ObjectMeta.Labels) { - if err := selector.addEntry(pod.Status.PodIP, podComment(pod)); err != nil { + if err := selector.addEntry(pod.ObjectMeta.UID, pod.Status.PodIP, podComment(pod)); err != nil { return err } @@ -111,7 +111,7 @@ func (ns *ns) onNewDstPodSelector(selector *selector) error { if hasIP(pod) { // Remove the pod from default-allow if dst podselector matches the pod if selector.matches(pod.ObjectMeta.Labels) { - if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, pod.Status.PodIP); err != nil { + if err := ns.ips.DelEntry(pod.ObjectMeta.UID, ns.defaultAllowIPSet, pod.Status.PodIP); err != nil { return err } } @@ -150,7 +150,7 @@ func (ns *ns) addToDefaultAllowIfNoMatching(pod *coreapi.Pod) error { } } if !found { - if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, pod.Status.PodIP, podComment(pod)); err != nil { + if err := ns.ips.AddEntry(pod.ObjectMeta.UID, ns.defaultAllowIPSet, pod.Status.PodIP, podComment(pod)); err != nil { return err } } @@ -172,16 +172,16 @@ func (ns *ns) addPod(obj *coreapi.Pod) error { } if ns.checkLocalPod(obj) { - ns.ips.AddEntry(LocalIpset, obj.Status.PodIP, podComment(obj)) + ns.ips.AddEntry(obj.ObjectMeta.UID, LocalIpset, obj.Status.PodIP, podComment(obj)) } - found, err := ns.podSelectors.addToMatching(obj.ObjectMeta.Labels, obj.Status.PodIP, podComment(obj)) + found, err := ns.podSelectors.addToMatching(obj.ObjectMeta.UID, obj.ObjectMeta.Labels, obj.Status.PodIP, podComment(obj)) if err != nil { return err } // If there are no matching dst selectors, add the pod to default-allow if !ns.legacy && !found { - if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, obj.Status.PodIP, podComment(obj)); err != nil { + if err := ns.ips.AddEntry(obj.ObjectMeta.UID, ns.defaultAllowIPSet, obj.Status.PodIP, podComment(obj)); err != nil { return err } } @@ -199,29 +199,29 @@ func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { if hasIP(oldObj) && !hasIP(newObj) { if ns.checkLocalPod(oldObj) { - ns.ips.DelEntry(LocalIpset, oldObj.Status.PodIP) + ns.ips.DelEntry(oldObj.ObjectMeta.UID, LocalIpset, oldObj.Status.PodIP) } if !ns.legacy { - if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { + if err := ns.ips.DelEntry(oldObj.ObjectMeta.UID, ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { return err } } - return ns.podSelectors.delFromMatching(oldObj.ObjectMeta.Labels, oldObj.Status.PodIP) + return ns.podSelectors.delFromMatching(oldObj.ObjectMeta.UID, oldObj.ObjectMeta.Labels, oldObj.Status.PodIP) } if !hasIP(oldObj) && hasIP(newObj) { if ns.checkLocalPod(newObj) { - ns.ips.AddEntry(LocalIpset, newObj.Status.PodIP, podComment(newObj)) + ns.ips.AddEntry(newObj.ObjectMeta.UID, LocalIpset, newObj.Status.PodIP, podComment(newObj)) } - found, err := ns.podSelectors.addToMatching(newObj.ObjectMeta.Labels, newObj.Status.PodIP, podComment(newObj)) + found, err := ns.podSelectors.addToMatching(newObj.ObjectMeta.UID, newObj.ObjectMeta.Labels, newObj.Status.PodIP, podComment(newObj)) if err != nil { return err } if !ns.legacy && !found { - if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, newObj.Status.PodIP, podComment(newObj)); err != nil { + if err := ns.ips.AddEntry(newObj.ObjectMeta.UID, ns.defaultAllowIPSet, newObj.Status.PodIP, podComment(newObj)); err != nil { return err } } @@ -231,12 +231,15 @@ func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { if !ns.legacy && oldObj.Status.PodIP != newObj.Status.PodIP && - ns.ips.Exist(ns.defaultAllowIPSet, oldObj.Status.PodIP) { + ns.ips.Exist(oldObj.ObjectMeta.UID, ns.defaultAllowIPSet, oldObj.Status.PodIP) { + // Instead of iterating over all selectors we check whether old pod IP + // has been inserted into default-allow ipset to decide whether the IP + // in the ipset has to be updated. - if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { + if err := ns.ips.DelEntry(oldObj.ObjectMeta.UID, ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { return err } - if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, newObj.Status.PodIP, podComment(newObj)); err != nil { + if err := ns.ips.AddEntry(newObj.ObjectMeta.UID, ns.defaultAllowIPSet, newObj.Status.PodIP, podComment(newObj)); err != nil { return err } } @@ -251,12 +254,12 @@ func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { continue } if oldMatch { - if err := ps.delEntry(oldObj.Status.PodIP); err != nil { + if err := ps.delEntry(oldObj.ObjectMeta.UID, oldObj.Status.PodIP); err != nil { return err } } if newMatch { - if err := ps.addEntry(newObj.Status.PodIP, podComment(newObj)); err != nil { + if err := ps.addEntry(newObj.ObjectMeta.UID, newObj.Status.PodIP, podComment(newObj)); err != nil { return err } } @@ -264,7 +267,7 @@ func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { if !ns.legacy && ns.podSelectors.dstSelectorExist(ps) { switch { case !oldMatch && newMatch: - if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { + if err := ns.ips.DelEntry(oldObj.ObjectMeta.UID, ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { return err } case oldMatch && !newMatch: @@ -287,16 +290,16 @@ func (ns *ns) deletePod(obj *coreapi.Pod) error { } if ns.checkLocalPod(obj) { - ns.ips.DelEntry(LocalIpset, obj.Status.PodIP) + ns.ips.DelEntry(obj.ObjectMeta.UID, LocalIpset, obj.Status.PodIP) } if !ns.legacy { - if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, obj.Status.PodIP); err != nil { + if err := ns.ips.DelEntry(obj.ObjectMeta.UID, ns.defaultAllowIPSet, obj.Status.PodIP); err != nil { return err } } - return ns.podSelectors.delFromMatching(obj.ObjectMeta.Labels, obj.Status.PodIP) + return ns.podSelectors.delFromMatching(obj.ObjectMeta.UID, obj.ObjectMeta.Labels, obj.Status.PodIP) } func (ns *ns) addNetworkPolicy(obj interface{}) error { @@ -408,7 +411,7 @@ func (ns *ns) addNamespace(obj *coreapi.Namespace) error { } // Add namespace ipset to matching namespace selectors - _, err := ns.nsSelectors.addToMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName), namespaceComment(ns)) + _, err := ns.nsSelectors.addToMatching(obj.ObjectMeta.UID, obj.ObjectMeta.Labels, string(ns.allPods.ipsetName), namespaceComment(ns)) return err } @@ -442,12 +445,12 @@ func (ns *ns) updateNamespace(oldObj, newObj *coreapi.Namespace) error { continue } if oldMatch { - if err := selector.delEntry(string(ns.allPods.ipsetName)); err != nil { + if err := selector.delEntry(ns.namespace.ObjectMeta.UID, string(ns.allPods.ipsetName)); err != nil { return err } } if newMatch { - if err := selector.addEntry(string(ns.allPods.ipsetName), namespaceComment(ns)); err != nil { + if err := selector.addEntry(ns.namespace.ObjectMeta.UID, string(ns.allPods.ipsetName), namespaceComment(ns)); err != nil { return err } } @@ -468,7 +471,7 @@ func (ns *ns) deleteNamespace(obj *coreapi.Namespace) error { } // Remove namespace ipset from any matching namespace selectors - return ns.nsSelectors.delFromMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName)) + return ns.nsSelectors.delFromMatching(obj.ObjectMeta.UID, obj.ObjectMeta.Labels, string(ns.allPods.ipsetName)) } func (ns *ns) isDefaultDeny(namespace *coreapi.Namespace) bool { diff --git a/npc/selector.go b/npc/selector.go index bb0d0cb5af..35a53f2844 100644 --- a/npc/selector.go +++ b/npc/selector.go @@ -46,12 +46,12 @@ func (s *selector) matches(labelMap map[string]string) bool { return s.spec.selector.Matches(labels.Set(labelMap)) } -func (s *selector) addEntry(entry string, comment string) error { - return s.ips.AddEntry(s.spec.ipsetName, entry, comment) +func (s *selector) addEntry(user types.UID, entry string, comment string) error { + return s.ips.AddEntry(user, s.spec.ipsetName, entry, comment) } -func (s *selector) delEntry(entry string) error { - return s.ips.DelEntry(s.spec.ipsetName, entry) +func (s *selector) delEntry(user types.UID, entry string) error { + return s.ips.DelEntry(user, s.spec.ipsetName, entry) } type selectorFn func(selector *selector) error @@ -85,14 +85,14 @@ func newSelectorSet(ips ipset.Interface, onNewSelector, onNewDstSelector selecto dstSelectorsCount: make(map[string]int)} } -func (ss *selectorSet) addToMatching(labelMap map[string]string, entry string, comment string) (bool, error) { +func (ss *selectorSet) addToMatching(user types.UID, labelMap map[string]string, entry string, comment string) (bool, error) { found := false for _, s := range ss.entries { if s.matches(labelMap) { if ss.dstSelectorExist(s) { found = true } - if err := s.addEntry(entry, comment); err != nil { + if err := s.addEntry(user, entry, comment); err != nil { return found, err } } @@ -100,10 +100,10 @@ func (ss *selectorSet) addToMatching(labelMap map[string]string, entry string, c return found, nil } -func (ss *selectorSet) delFromMatching(labelMap map[string]string, entry string) error { +func (ss *selectorSet) delFromMatching(user types.UID, labelMap map[string]string, entry string) error { for _, s := range ss.entries { if s.matches(labelMap) { - if err := s.delEntry(entry); err != nil { + if err := s.delEntry(user, entry); err != nil { return err } }