Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix watching VPP events #1640

Merged
merged 3 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/govppmux/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Config struct {

func DefaultConfig() *Config {
return &Config{
ReconnectResync: true,
HealthCheckProbeInterval: time.Second,
HealthCheckReplyTimeout: 250 * time.Millisecond,
HealthCheckThreshold: 1,
Expand Down
57 changes: 45 additions & 12 deletions plugins/govppmux/plugin_impl_govppmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ type Plugin struct {
vpeHandler vppcalls.VppCoreAPI

binapiVersion vpp.Version
vppAdapter adapter.VppAPI
vppConn *govpp.Connection
vppConChan chan govpp.ConnectionEvent
lastConnErr error
vppapiChan govppapi.Channel
lastPID uint32

onnConnectMu sync.Mutex
onConnects []func()

statsMu sync.Mutex
statsAdapter adapter.StatsAPI
Expand Down Expand Up @@ -91,7 +96,6 @@ func (p *Plugin) Init() (err error) {
if p.config, err = p.loadConfig(); err != nil {
return err
}

p.Log.Debugf("config: %+v", p.config)

// set GoVPP config
Expand All @@ -100,9 +104,6 @@ func (p *Plugin) Init() (err error) {
govpp.HealthCheckThreshold = p.config.HealthCheckThreshold
govpp.DefaultReplyTimeout = p.config.ReplyTimeout

// register REST API handlers
p.registerHandlers(p.HTTPHandlers)

var address string
useShm := disabledSocketClient || p.config.ConnectViaShm || p.config.ShmPrefix != ""
if useShm {
Expand All @@ -111,13 +112,19 @@ func (p *Plugin) Init() (err error) {
address = p.config.BinAPISocketPath
}

p.Log.Debugf("found %d registered VPP handlers", len(vpp.GetHandlers()))
for name, handler := range vpp.GetHandlers() {
versions := handler.Versions()
p.Log.Debugf("- handler: %-10s has %d versions: %v", name, len(versions), versions)
}

// TODO: Async connect & automatic reconnect support is not yet implemented in the agent,
// so synchronously wait until connected to VPP.
startTime := time.Now()
p.Log.Debugf("connecting to VPP..")

vppAdapter := NewVppAdapter(address, useShm)
p.vppConn, p.vppConChan, err = govpp.AsyncConnect(vppAdapter, p.config.RetryConnectCount, p.config.RetryConnectTimeout)
p.vppAdapter = NewVppAdapter(address, useShm)
p.vppConn, p.vppConChan, err = govpp.AsyncConnect(p.vppAdapter, p.config.RetryConnectCount, p.config.RetryConnectTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -170,6 +177,9 @@ func (p *Plugin) Init() (err error) {
p.Log.Infof("VPP proxy ready")
}

// register REST API handlers
p.registerHandlers(p.HTTPHandlers)

return nil
}

Expand Down Expand Up @@ -317,7 +327,7 @@ func (p *Plugin) updateVPPInfo() (err error) {
if err != nil {
return err
}

// sort plugins by name
sort.Slice(plugins, func(i, j int) bool { return plugins[i].Name < plugins[j].Name })

p.Log.Debugf("VPP loaded %d plugins", len(plugins))
Expand All @@ -334,15 +344,29 @@ func (p *Plugin) updateVPPInfo() (err error) {
}
p.infoMu.Unlock()

p.Log.Debugf("found %d registered VPP handlers", len(vpp.GetHandlers()))
for name, handler := range vpp.GetHandlers() {
versions := handler.Versions()
p.Log.Debugf("- handler: %-10s has %d versions: %v", name, len(versions), versions)
if p.lastPID != 0 && p.lastPID != session.PID {
p.Log.Warnf("VPP has restarted (previous PID: %d)", p.lastPID)
p.onConnect()
}
p.lastPID = session.PID

return nil
}

func (p *Plugin) OnReconnect(fn func()) {
p.onnConnectMu.Lock()
defer p.onnConnectMu.Unlock()
p.onConnects = append(p.onConnects, fn)
}

func (p *Plugin) onConnect() {
p.onnConnectMu.Lock()
defer p.onnConnectMu.Unlock()
for _, fn := range p.onConnects {
fn()
}
}

// handleVPPConnectionEvents handles VPP connection events.
func (p *Plugin) handleVPPConnectionEvents(ctx context.Context) {
defer p.wg.Done()
Expand All @@ -356,6 +380,8 @@ func (p *Plugin) handleVPPConnectionEvents(ctx context.Context) {
return
}

p.Log.Debugf("VPP connection state changed: %+v", event)

if event.State == govpp.Connected {
if err := p.updateVPPInfo(); err != nil {
p.Log.Errorf("updating VPP info failed: %v", err)
Expand All @@ -378,8 +404,15 @@ func (p *Plugin) handleVPPConnectionEvents(ctx context.Context) {

p.lastConnErr = errors.Errorf("VPP connection lost (event: %+v)", event)
p.StatusCheck.ReportStateChange(p.PluginName, statuscheck.Error, p.lastConnErr)

// TODO: fix reconnecting after reaching maximum reconnect attempts
// current implementation wont work with already created govpp channels
// keep reconnecting
/*if event.State == govpp.Failed {
p.vppConn, p.vppConChan, _ = govpp.AsyncConnect(p.vppAdapter, p.config.RetryConnectCount, p.config.RetryConnectTimeout)
}*/
} else {
p.Log.Debugf("VPP connection state: %+v", event)
p.Log.Warnf("unknown VPP connection state: %+v", event)
}

p.infoMu.Lock()
Expand Down
6 changes: 3 additions & 3 deletions plugins/vpp/ifplugin/descriptor/dhcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,16 @@ func (d *DHCPDescriptor) watchDHCPNotifications(ctx context.Context) {
defer d.wg.Done()
d.log.Debug("Started watcher on DHCP notifications")

dhcpChan := make(chan *vppcalls.Lease)
if err := d.ifHandler.WatchDHCPLeases(dhcpChan); err != nil {
dhcpChan := make(chan *vppcalls.Lease, 10)
if err := d.ifHandler.WatchDHCPLeases(ctx, dhcpChan); err != nil {
d.log.Errorf("watching dhcp leases failed: %v", err)
return
}

for {
select {
case lease := <-dhcpChan:
// interface logical name
// Get interface logical name
ifName, _, found := d.ifIndex.LookupBySwIfIndex(lease.SwIfIndex)
if !found {
d.log.Warnf("Interface sw_if_index=%d with DHCP lease was not found in the mapping", lease.SwIfIndex)
Expand Down
3 changes: 1 addition & 2 deletions plugins/vpp/ifplugin/descriptor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ func (d *InterfaceDescriptor) EquivalentInterfaces(key string, oldIntf, newIntf
}

// compare MAC addresses case-insensitively (also handle unspecified MAC address)
if newIntf.PhysAddress != "" &&
strings.ToLower(oldIntf.PhysAddress) != strings.ToLower(newIntf.PhysAddress) {
if newIntf.PhysAddress != "" && !strings.EqualFold(oldIntf.PhysAddress, newIntf.PhysAddress) {
return false
}

Expand Down
4 changes: 3 additions & 1 deletion plugins/vpp/ifplugin/descriptor/interface_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/pkg/errors"

"go.ligato.io/vpp-agent/v3/plugins/vpp"

"go.ligato.io/vpp-agent/v3/pkg/models"
Expand Down Expand Up @@ -61,8 +62,9 @@ func (d *InterfaceDescriptor) Create(key string, intf *interfaces.Interface) (me
d.log.Error(err)
return nil, err
}
ifIdx, err = d.ifHandler.AddMemifInterface(context.TODO(), intf.Name, intf.GetMemif(), socketID)
ifIdx, err = d.ifHandler.AddMemifInterface(ctx, intf.Name, intf.GetMemif(), socketID)
if err != nil {
err := errors.WithMessagef(err, "adding memif interface %s (socketID: %d) failed", intf.Name, socketID)
d.log.Error(err)
return nil, err
}
Expand Down
5 changes: 1 addition & 4 deletions plugins/vpp/ifplugin/descriptor/interface_vrf.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ func (d *InterfaceVrfDescriptor) IsInterfaceVrfKey(key string) bool {
return true
}
_, _, isIfaceInherVrfKey := interfaces.ParseInterfaceInheritedVrfKey(key)
if isIfaceInherVrfKey {
return true
}
return false
return isIfaceInherVrfKey
}

// Create puts interface into the given VRF table.
Expand Down
4 changes: 1 addition & 3 deletions plugins/vpp/ifplugin/ifplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type IfPlugin struct {
// descriptors
linkStateDescriptor *descriptor.LinkStateDescriptor
dhcpDescriptor *descriptor.DHCPDescriptor
spanDescriptor *descriptor.SpanDescriptor

// from config file
defaultMtu uint32
Expand Down Expand Up @@ -243,8 +242,7 @@ func (p *IfPlugin) Init() (err error) {
}
}

err = p.ifStateUpdater.Init(p.ctx, p.Log, p.KVScheduler, p.VPP, p.intfIndex,
ifNotifHandler, p.publishStats)
err = p.ifStateUpdater.Init(p.ctx, p.Log, p.KVScheduler, p.VPP, p.intfIndex, ifNotifHandler, p.publishStats)
if err != nil {
return err
}
Expand Down
49 changes: 23 additions & 26 deletions plugins/vpp/ifplugin/interface_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type InterfaceStateUpdater struct {

ifMetaChan chan ifaceidx.IfaceMetadataDto

ifHandler vppcalls.InterfaceVppAPI
ifEvents chan *vppcalls.InterfaceEvent
ifHandler vppcalls.InterfaceVppAPI
ifEvents chan *vppcalls.InterfaceEvent
cancelIfEvents func()

ifsForUpdate map[uint32]struct{}
lastIfCounters map[uint32]govppapi.InterfaceCounters
Expand All @@ -60,6 +61,7 @@ type InterfaceStateUpdater struct {
lastIfNotif time.Time
lastIfMeta time.Time

ctx context.Context
cancel context.CancelFunc // cancel can be used to cancel all goroutines and their jobs inside of the plugin
wg sync.WaitGroup // wait group that allows to wait until all goroutines of the plugin have finished
}
Expand Down Expand Up @@ -96,61 +98,57 @@ func (c *InterfaceStateUpdater) Init(
c.ifEvents = make(chan *vppcalls.InterfaceEvent, 1000)

// Create child context
var childCtx context.Context
childCtx, c.cancel = context.WithCancel(ctx)
c.ctx, c.cancel = context.WithCancel(ctx)

// Watch for incoming notifications
c.wg.Add(1)
go c.watchVPPNotifications(childCtx)
go c.watchVPPNotifications(c.ctx)

// Periodically read VPP counters and combined counters for VPP statistics
if disableInterfaceStats {
c.log.Warnf("reading interface stats is DISABLED!")
} else if readCounters {
c.wg.Add(1)
go c.startReadingCounters(childCtx)
go c.startReadingCounters(c.ctx)
}

if disableStatusPublishing {
c.log.Warnf("publishing interface status is DISABLED!")
} else {
c.wg.Add(1)
go c.startUpdatingIfStateDetails(childCtx)
go c.startUpdatingIfStateDetails(c.ctx)
}

return nil
}

// AfterInit subscribes for watching VPP notifications on previously initialized channel
func (c *InterfaceStateUpdater) AfterInit() error {
err := c.subscribeVPPNotifications()
if err != nil {
if err := c.subscribeVPPNotifications(c.ctx); err != nil {
return err
}
return nil
}

// subscribeVPPNotifications subscribes for interface state notifications from VPP.
func (c *InterfaceStateUpdater) subscribeVPPNotifications() error {
if err := c.ifHandler.WatchInterfaceEvents(c.ifEvents); err != nil {
return err
}

c.vppClient.OnReconnect(func() {
c.cancelIfEvents()
if err := c.subscribeVPPNotifications(c.ctx); err != nil {
c.log.Warnf("WatchInterfaceEvents failed: %v", err)
}
})
return nil
}

// Close unsubscribes from interface state notifications from VPP & GOVPP channel
func (c *InterfaceStateUpdater) Close() error {
c.cancel()
c.wg.Wait()
return nil
}

// TODO: handle unsubscribing
/*if c.vppNotifSubs != nil {
if err := c.vppNotifSubs.Unsubscribe(); err != nil {
return errors.Errorf("failed to unsubscribe interface state notification on close: %v", err)
}
}*/

// subscribeVPPNotifications subscribes for interface state notifications from VPP.
func (c *InterfaceStateUpdater) subscribeVPPNotifications(ctx context.Context) error {
ctx, c.cancelIfEvents = context.WithCancel(ctx)
if err := c.ifHandler.WatchInterfaceEvents(ctx, c.ifEvents); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -387,7 +385,6 @@ func (c *InterfaceStateUpdater) getIfStateDataWLookup(ifIdx uint32) (*intf.Inter
Name: ifName,
Statistics: &intf.InterfaceState_Statistics{},
}

c.ifState[ifIdx] = ifState
found = true
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/vpp/ifplugin/vppcalls/interface_handler_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ type InterfaceVppRead interface {
// DumpDhcpClients dumps DHCP-related information for all interfaces.
DumpDhcpClients() (map[uint32]*Dhcp, error)
// WatchInterfaceEvents starts watching for interface events.
WatchInterfaceEvents(ch chan<- *InterfaceEvent) error
WatchInterfaceEvents(ctx context.Context, events chan<- *InterfaceEvent) error
// WatchDHCPLeases starts watching for DHCP leases.
WatchDHCPLeases(ch chan<- *Lease) error
WatchDHCPLeases(ctx context.Context, leases chan<- *Lease) error
}

var Handler = vpp.RegisterHandler(vpp.HandlerDesc{
Expand Down
Loading