Skip to content

Commit

Permalink
Get rid of executor
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art committed Sep 12, 2023
1 parent 75ddb60 commit 8c44798
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
github.com/cilium/ebpf v0.10.0
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29
github.com/edwarnicke/serialize v1.0.7
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -32,7 +33,6 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/edwarnicke/exechelper v1.0.2 // indirect
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 // indirect
github.com/edwarnicke/grpcfd v1.1.2 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/ftrvxmtrx/fd v0.0.0-20150925145434-c6d800382fff // indirect
Expand Down
96 changes: 49 additions & 47 deletions pkg/tools/vl3lb/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"net"
"time"

"github.com/edwarnicke/serialize"
"github.com/edwarnicke/genericsync"
"go.fd.io/govpp/api"

"github.com/networkservicemesh/govpp/binapi/cnat"
Expand Down Expand Up @@ -55,8 +55,7 @@ type Handler struct {

// [vl3-NSE] --> [connID]*Endpoint
// We store it this way because the plugin does not add, but only updates existing entries. Therefore, to add/delete one entry, we must also pass the old ones.
servers map[string]map[string]*Endpoint
executor serialize.Executor
servers genericsync.Map[string, *genericsync.Map[string, *Endpoint]]
}

// NewHandler creates a Handler.
Expand All @@ -72,7 +71,6 @@ func NewHandler(vppConn api.Connection, endpoint *Endpoint, proto ip_types.IPPro
proto: proto,
isRealIP: 1,
lbType: cnat.CNAT_LB_TYPE_MAGLEV,
servers: make(map[string]map[string]*Endpoint),
}
}

Expand All @@ -85,71 +83,72 @@ func cnatTranslationString(c *cnat.CnatTranslation) string {
}

// AddServers adds the real servers to the VPP plugin
func (c *Handler) AddServers(ctx context.Context, vl3NSEName string, add map[string]*Endpoint) error {
var err error
<-c.executor.AsyncExec(func() {
if _, ok := c.servers[vl3NSEName]; !ok {
c.servers[vl3NSEName] = make(map[string]*Endpoint)
}

updateRequired := false
for k, v := range add {
if endpoint, ok := c.servers[vl3NSEName][k]; !ok || !endpoint.Equals(v) {
c.servers[vl3NSEName][k] = v
updateRequired = true
}
func (c *Handler) AddServers(ctx context.Context, vl3NSEName string, add map[string]*Endpoint) (err error) {
updateRequired := false
realServers, _ := c.servers.LoadOrStore(vl3NSEName, new(genericsync.Map[string, *Endpoint]))
for k, v := range add {
if endpoint, ok := realServers.Load(k); !ok || !endpoint.Equals(v) {
realServers.Store(k, v)
updateRequired = true
}
}

if updateRequired {
err = c.updateVPPCnat(ctx)
}
})
if updateRequired {
err = c.updateVPPCnat(ctx)
}

return err
}

// DeleteServers deletes the real servers from the VPP plugin
func (c *Handler) DeleteServers(ctx context.Context, vl3NSEName string, del []string) error {
var err error
<-c.executor.AsyncExec(func() {
if _, ok := c.servers[vl3NSEName]; !ok {
return
}
func (c *Handler) DeleteServers(ctx context.Context, vl3NSEName string, del []string) (err error) {
realServers, ok := c.servers.Load(vl3NSEName)
if !ok {
return nil
}

updateRequired := false
for _, id := range del {
delete(c.servers[vl3NSEName], id)
updateRequired = true
}
if len(c.servers[vl3NSEName]) == 0 {
updateRequired := false
for _, id := range del {
realServers.Delete(id)
updateRequired = true
}

if updateRequired {
var length int
realServers.Range(func(key string, value *Endpoint) bool {
length++
return true
})

if length == 0 {
log.FromContext(ctx).WithField("vl3Loadbalancer", "DeleteServers").Infof("Delete VL3NSE: %s ", vl3NSEName)
delete(c.servers, vl3NSEName)
c.servers.Delete(vl3NSEName)
}

if updateRequired {
err = c.updateVPPCnat(ctx)
}
})
err = c.updateVPPCnat(ctx)
}

return err
}

// GetServerIDsByVL3Name returns the list of the servers belonging to the vl3-NSE
func (c *Handler) GetServerIDsByVL3Name(vl3NSEName string) []string {
var list []string
<-c.executor.AsyncExec(func() {
for id := range c.servers[vl3NSEName] {
list = append(list, id)
}
})
realServers, loaded := c.servers.Load(vl3NSEName)
if loaded {
realServers.Range(func(key string, value *Endpoint) bool {
list = append(list, key)
return true
})
}
return list
}

// should work under executor
func (c *Handler) updateVPPCnat(ctx context.Context) error {
var paths []cnat.CnatEndpointTuple
for _, vl3Servers := range c.servers {
for _, s := range vl3Servers {
c.servers.Range(func(key string, realServers *genericsync.Map[string, *Endpoint]) bool {
realServers.Range(func(key string, s *Endpoint) bool {
paths = append(paths, cnat.CnatEndpointTuple{
DstEp: cnat.CnatEndpoint{
Addr: types.ToVppAddress(s.IP),
Expand All @@ -161,8 +160,11 @@ func (c *Handler) updateVPPCnat(ctx context.Context) error {
SwIfIndex: interface_types.InterfaceIndex(^uint32(0)),
},
})
}
}
return true
})
return true
})

if len(paths) == 0 {
now := time.Now()
cnatTranslationDel := cnat.CnatTranslationDel{ID: 0}
Expand Down

0 comments on commit 8c44798

Please sign in to comment.