Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.5] Use GC rather than refcounting for VNID policy rules #14801

Merged
merged 2 commits into from
Jun 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions pkg/sdn/plugin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"time"

Expand All @@ -13,12 +14,14 @@ import (
osapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/util/ipcmd"
"github.com/openshift/origin/pkg/util/netutils"
"github.com/openshift/origin/pkg/util/ovs"

kapi "k8s.io/kubernetes/pkg/api"
kapierrors "k8s.io/kubernetes/pkg/api/errors"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
kexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/sysctl"
utilwait "k8s.io/kubernetes/pkg/util/wait"
)
Expand Down Expand Up @@ -489,3 +492,66 @@ func generateAddServiceRule(netID uint32, IP string, protocol kapi.Protocol, por
func generateDeleteServiceRule(IP string, protocol kapi.Protocol, port int) string {
return generateBaseServiceRule(IP, protocol, port)
}

// FindUnusedVNIDs returns a list of VNIDs for which there are table 80 "check" rules,
// but no table 60/70 "load" rules (meaning that there are no longer any pods or services
// on this node with that VNID). There is no locking with respect to other ovsController
// actions, but as long the "add a pod" and "add a service" codepaths add the
// pod/service-specific rules before they call policy.EnsureVNIDRules(), then there is no
// race condition.
func (node *OsdnNode) FindUnusedVNIDs() []int {
flows, err := node.ovs.DumpFlows()
if err != nil {
glog.Errorf("FindUnusedVNIDs: could not DumpFlows: %v", err)
return nil
}

// inUseVNIDs is the set of VNIDs in use by pods or services on this node.
// policyVNIDs is the set of VNIDs that we have rules for delivering to.
// VNID 0 is always assumed to be in both sets.
inUseVNIDs := sets.NewInt(0)
policyVNIDs := sets.NewInt(0)
for _, flow := range flows {
parsed, err := ovs.ParseFlow(ovs.ParseForDump, flow)
if err != nil {
glog.Warningf("FindUnusedVNIDs: could not parse flow %q: %v", flow, err)
continue
}

// A VNID is in use if there is a table 60 (services) or 70 (pods) flow that
// loads that VNID into reg1 for later comparison.
if parsed.Table == 60 || parsed.Table == 70 {
// Can't use FindAction here since there may be multiple "load"s
for _, action := range parsed.Actions {
if action.Name != "load" || strings.Index(action.Value, "REG1") == -1 {
continue
}
vnidEnd := strings.Index(action.Value, "->")
if vnidEnd == -1 {
continue
}
vnid, err := strconv.ParseInt(action.Value[:vnidEnd], 0, 32)
if err != nil {
glog.Warningf("FindUnusedVNIDs: could not parse VNID in 'load:%s': %v", action.Value, err)
continue
}
inUseVNIDs.Insert(int(vnid))
break
}
}

// A VNID is checked by policy if there is a table 80 rule comparing reg1 to it.
if parsed.Table == 80 {
if field, exists := parsed.FindField("reg1"); exists {
vnid, err := strconv.ParseInt(field.Value, 0, 32)
if err != nil {
glog.Warningf("FindUnusedVNIDs: could not parse VNID in 'reg1=%s': %v", field.Value, err)
continue
}
policyVNIDs.Insert(int(vnid))
}
}
}

return policyVNIDs.Difference(inUseVNIDs).UnsortedList()
}
83 changes: 22 additions & 61 deletions pkg/sdn/plugin/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ type multiTenantPlugin struct {
node *OsdnNode
vnids *nodeVNIDMap

vnidRefsLock sync.Mutex
vnidRefs map[uint32]int
vnidInUseLock sync.Mutex
vnidInUse map[uint32]bool
}

func NewMultiTenantPlugin() osdnPolicy {
return &multiTenantPlugin{
vnidRefs: make(map[uint32]int),
vnidInUse: make(map[uint32]bool),
}
}

Expand Down Expand Up @@ -65,14 +65,10 @@ func (mp *multiTenantPlugin) updatePodNetwork(namespace string, oldNetID, netID
}

if oldNetID != netID {
movedVNIDRefs := 0

// Update OF rules for the existing/old pods in the namespace
for _, pod := range pods {
err = mp.node.UpdatePod(pod)
if err == nil {
movedVNIDRefs++
} else {
if err != nil {
glog.Errorf("Could not update pod %q in namespace %q: %v", pod.Name, namespace, err)
}
}
Expand All @@ -85,12 +81,9 @@ func (mp *multiTenantPlugin) updatePodNetwork(namespace string, oldNetID, netID

mp.node.DeleteServiceRules(&svc)
mp.node.AddServiceRules(&svc, netID)
movedVNIDRefs++
}

if movedVNIDRefs > 0 {
mp.moveVNIDRefs(movedVNIDRefs, oldNetID, netID)
}
mp.EnsureVNIDRules(netID)

// Update namespace references in egress firewall rules
mp.node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID)
Expand Down Expand Up @@ -125,18 +118,19 @@ func (mp *multiTenantPlugin) GetMulticastEnabled(vnid uint32) bool {
return mp.vnids.GetMulticastEnabled(vnid)
}

func (mp *multiTenantPlugin) RefVNID(vnid uint32) {
func (mp *multiTenantPlugin) EnsureVNIDRules(vnid uint32) {
if vnid == 0 {
return
}

mp.vnidRefsLock.Lock()
defer mp.vnidRefsLock.Unlock()
mp.vnidRefs[vnid] += 1
if mp.vnidRefs[vnid] > 1 {
mp.vnidInUseLock.Lock()
defer mp.vnidInUseLock.Unlock()
if mp.vnidInUse[vnid] {
return
}
glog.V(5).Infof("RefVNID %d adding rule", vnid)
mp.vnidInUse[vnid] = true

glog.V(5).Infof("EnsureVNIDRules %d - adding rules", vnid)

otx := mp.node.ovs.NewTransaction()
otx.AddFlow("table=80, priority=100, reg0=%d, reg1=%d, actions=output:NXM_NX_REG2[]", vnid, vnid)
Expand All @@ -145,52 +139,19 @@ func (mp *multiTenantPlugin) RefVNID(vnid uint32) {
}
}

func (mp *multiTenantPlugin) UnrefVNID(vnid uint32) {
if vnid == 0 {
return
}

mp.vnidRefsLock.Lock()
defer mp.vnidRefsLock.Unlock()
if mp.vnidRefs[vnid] == 0 {
glog.Warningf("refcounting error on vnid %d", vnid)
return
}
mp.vnidRefs[vnid] -= 1
if mp.vnidRefs[vnid] > 0 {
return
}
glog.V(5).Infof("UnrefVNID %d removing rule", vnid)

otx := mp.node.ovs.NewTransaction()
otx.DeleteFlows("table=80, reg0=%d, reg1=%d", vnid, vnid)
if err := otx.EndTransaction(); err != nil {
glog.Errorf("Error deleting OVS flow for VNID: %v", err)
}
}

func (mp *multiTenantPlugin) moveVNIDRefs(num int, oldVNID, newVNID uint32) {
glog.V(5).Infof("moveVNIDRefs %d -> %d", oldVNID, newVNID)
func (mp *multiTenantPlugin) SyncVNIDRules() {
mp.vnidInUseLock.Lock()
defer mp.vnidInUseLock.Unlock()

mp.vnidRefsLock.Lock()
defer mp.vnidRefsLock.Unlock()
unused := mp.node.FindUnusedVNIDs()
glog.Infof("SyncVNIDRules: %d unused VNIDs", len(unused))

otx := mp.node.ovs.NewTransaction()
if mp.vnidRefs[oldVNID] <= num {
otx.DeleteFlows("table=80, reg0=%d, reg1=%d", oldVNID, oldVNID)
for _, vnid := range unused {
mp.vnidInUse[uint32(vnid)] = false
otx.DeleteFlows("table=80, reg1=%d", vnid)
}
if mp.vnidRefs[newVNID] == 0 {
otx.AddFlow("table=80, priority=100, reg0=%d, reg1=%d, actions=output:NXM_NX_REG2[]", newVNID, newVNID)
}
err := otx.EndTransaction()
if err != nil {
glog.Errorf("Error modifying OVS flows for VNID: %v", err)
}

mp.vnidRefs[oldVNID] -= num
if mp.vnidRefs[oldVNID] < 0 {
glog.Warningf("refcounting error on vnid %d", oldVNID)
mp.vnidRefs[oldVNID] = 0
if err := otx.EndTransaction(); err != nil {
glog.Errorf("Error deleting syncing OVS VNID rules: %v", err)
}
mp.vnidRefs[newVNID] += num
}
56 changes: 27 additions & 29 deletions pkg/sdn/plugin/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type npNamespace struct {
name string
vnid uint32
isolated bool
refs int
inUse bool

policies map[ktypes.UID]*npPolicy
Expand Down Expand Up @@ -111,7 +110,7 @@ func (np *networkPolicyPlugin) initNamespaces() error {
name: ns.Name,
vnid: vnid,
isolated: namespaceIsIsolated(&ns),
refs: 0,
inUse: false,
policies: make(map[ktypes.UID]*npPolicy),
}
}
Expand Down Expand Up @@ -154,7 +153,7 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *osapi.NetNamespace) {
name: netns.NetName,
vnid: netns.NetID,
isolated: isolated,
refs: 0,
inUse: false,
policies: make(map[ktypes.UID]*npPolicy),
}
}
Expand Down Expand Up @@ -187,15 +186,10 @@ func (np *networkPolicyPlugin) GetMulticastEnabled(vnid uint32) bool {
}

func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
inUse := npns.refs > 0
if !inUse && !npns.inUse {
return
}

glog.V(5).Infof("syncNamespace %d", npns.vnid)
otx := np.node.ovs.NewTransaction()
otx.DeleteFlows("table=80, reg1=%d", npns.vnid)
if inUse {
if npns.inUse {
if npns.isolated {
for _, npp := range npns.policies {
for _, flow := range npp.flows {
Expand All @@ -209,37 +203,35 @@ func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
if err := otx.EndTransaction(); err != nil {
glog.Errorf("Error syncing OVS flows for VNID: %v", err)
}
npns.inUse = inUse
}

func (np *networkPolicyPlugin) RefVNID(vnid uint32) {
func (np *networkPolicyPlugin) EnsureVNIDRules(vnid uint32) {
np.lock.Lock()
defer np.lock.Unlock()

npns, exists := np.namespaces[vnid]
if !exists {
if !exists || npns.inUse {
return
}

npns.refs += 1
npns.inUse = true
np.syncNamespace(npns)
}

func (np *networkPolicyPlugin) UnrefVNID(vnid uint32) {
func (np *networkPolicyPlugin) SyncVNIDRules() {
np.lock.Lock()
defer np.lock.Unlock()

npns, exists := np.namespaces[vnid]
if !exists {
return
}
if npns.refs == 0 {
glog.Warningf("refcounting error on vnid %d", vnid)
return
}
unused := np.node.FindUnusedVNIDs()
glog.Infof("SyncVNIDRules: %d unused VNIDs", len(unused))

npns.refs -= 1
np.syncNamespace(npns)
for _, vnid := range unused {
npns, exists := np.namespaces[uint32(vnid)]
if exists {
npns.inUse = false
np.syncNamespace(npns)
}
}
}

// watchPods watches Pod changes in npns until stopPodWatch is triggered. pods
Expand Down Expand Up @@ -294,7 +286,7 @@ func (np *networkPolicyPlugin) watchPods(npns *npNamespace, pods map[ktypes.UID]
}
}
}
if changed {
if changed && npns.inUse {
np.syncNamespace(npns)
}

Expand Down Expand Up @@ -485,11 +477,15 @@ func (np *networkPolicyPlugin) watchNetworkPolicies() {
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
if changed := np.updateNetworkPolicy(npns, policy); changed {
np.syncNamespace(npns)
if npns.inUse {
np.syncNamespace(npns)
}
}
case cache.Deleted:
delete(npns.policies, policy.UID)
np.syncNamespace(npns)
if npns.inUse {
np.syncNamespace(npns)
}
}

return nil
Expand Down Expand Up @@ -548,7 +544,9 @@ func (np *networkPolicyPlugin) watchNamespaces() {
np.kNamespaces[ns.Name] = *ns
if npns, exists := np.namespaces[vnid]; exists {
npns.isolated = namespaceIsIsolated(ns)
np.syncNamespace(npns)
if npns.inUse {
np.syncNamespace(npns)
}
}
// else the NetNamespace doesn't exist yet, but we will initialize
// npns.isolated from the kapi.Namespace when it's created
Expand All @@ -571,7 +569,7 @@ func (np *networkPolicyPlugin) watchNamespaces() {
}
}
}
if changed {
if changed && npns.inUse {
np.syncNamespace(npns)
}
}
Expand Down
Loading