From dc82f9edf2a61e778197aa195f650e6ad76218f8 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Fri, 13 Mar 2020 12:39:10 +0100 Subject: [PATCH 1/3] Set ReconnectResync default value in govpp config to true Signed-off-by: Ondrej Fabry --- plugins/govppmux/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/govppmux/config.go b/plugins/govppmux/config.go index 1a924333e3..be67163985 100644 --- a/plugins/govppmux/config.go +++ b/plugins/govppmux/config.go @@ -63,6 +63,7 @@ type Config struct { func DefaultConfig() *Config { return &Config{ + ReconnectResync: true, HealthCheckProbeInterval: time.Second, HealthCheckReplyTimeout: 250 * time.Millisecond, HealthCheckThreshold: 1, From b42da7dad0dd4ee951264ae296eb2881533f1208 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Fri, 13 Mar 2020 12:43:07 +0100 Subject: [PATCH 2/3] Refactor watching VPP events to support cancelling Signed-off-by: Ondrej Fabry --- plugins/govppmux/plugin_impl_govppmux.go | 57 ++++-- .../vppcalls/interface_handler_api.go | 4 +- .../vppcalls/vpp1904/watch_vppcalls.go | 188 ++++++++++++------ .../vppcalls/vpp1904/watch_vppcalls_test.go | 9 +- .../vppcalls/vpp1908/watch_vppcalls.go | 188 ++++++++++++------ .../vppcalls/vpp1908/watch_vppcalls_test.go | 9 +- .../vppcalls/vpp2001/watch_vppcalls.go | 169 +++++++++++----- .../vppcalls/vpp2001/watch_vppcalls_test.go | 13 +- 8 files changed, 441 insertions(+), 196 deletions(-) diff --git a/plugins/govppmux/plugin_impl_govppmux.go b/plugins/govppmux/plugin_impl_govppmux.go index 1ae14fa4f4..bb4c853d7d 100644 --- a/plugins/govppmux/plugin_impl_govppmux.go +++ b/plugins/govppmux/plugin_impl_govppmux.go @@ -57,10 +57,15 @@ type Plugin struct { vpeHandler vppcalls.VppCoreAPI binapiVersion vpp.Version + vppAdapter adapter.VppAPI vppConn *govpp.Connection vppConChan chan govpp.ConnectionEvent lastConnErr error vppapiChan govppapi.Channel + lastPID uint32 + + onnConnectMu sync.Mutex + onConnects []func() statsMu sync.Mutex statsAdapter adapter.StatsAPI @@ -91,7 +96,6 @@ func (p *Plugin) Init() (err error) { if p.config, err = p.loadConfig(); err != nil { return err } - p.Log.Debugf("config: %+v", p.config) // set GoVPP config @@ -100,9 +104,6 @@ func (p *Plugin) Init() (err error) { govpp.HealthCheckThreshold = p.config.HealthCheckThreshold govpp.DefaultReplyTimeout = p.config.ReplyTimeout - // register REST API handlers - p.registerHandlers(p.HTTPHandlers) - var address string useShm := disabledSocketClient || p.config.ConnectViaShm || p.config.ShmPrefix != "" if useShm { @@ -111,13 +112,19 @@ func (p *Plugin) Init() (err error) { address = p.config.BinAPISocketPath } + p.Log.Debugf("found %d registered VPP handlers", len(vpp.GetHandlers())) + for name, handler := range vpp.GetHandlers() { + versions := handler.Versions() + p.Log.Debugf("- handler: %-10s has %d versions: %v", name, len(versions), versions) + } + // TODO: Async connect & automatic reconnect support is not yet implemented in the agent, // so synchronously wait until connected to VPP. startTime := time.Now() p.Log.Debugf("connecting to VPP..") - vppAdapter := NewVppAdapter(address, useShm) - p.vppConn, p.vppConChan, err = govpp.AsyncConnect(vppAdapter, p.config.RetryConnectCount, p.config.RetryConnectTimeout) + p.vppAdapter = NewVppAdapter(address, useShm) + p.vppConn, p.vppConChan, err = govpp.AsyncConnect(p.vppAdapter, p.config.RetryConnectCount, p.config.RetryConnectTimeout) if err != nil { return err } @@ -170,6 +177,9 @@ func (p *Plugin) Init() (err error) { p.Log.Infof("VPP proxy ready") } + // register REST API handlers + p.registerHandlers(p.HTTPHandlers) + return nil } @@ -317,7 +327,7 @@ func (p *Plugin) updateVPPInfo() (err error) { if err != nil { return err } - + // sort plugins by name sort.Slice(plugins, func(i, j int) bool { return plugins[i].Name < plugins[j].Name }) p.Log.Debugf("VPP loaded %d plugins", len(plugins)) @@ -334,15 +344,29 @@ func (p *Plugin) updateVPPInfo() (err error) { } p.infoMu.Unlock() - p.Log.Debugf("found %d registered VPP handlers", len(vpp.GetHandlers())) - for name, handler := range vpp.GetHandlers() { - versions := handler.Versions() - p.Log.Debugf("- handler: %-10s has %d versions: %v", name, len(versions), versions) + if p.lastPID != 0 && p.lastPID != session.PID { + p.Log.Warnf("VPP has restarted (previous PID: %d)", p.lastPID) + p.onConnect() } + p.lastPID = session.PID return nil } +func (p *Plugin) OnReconnect(fn func()) { + p.onnConnectMu.Lock() + defer p.onnConnectMu.Unlock() + p.onConnects = append(p.onConnects, fn) +} + +func (p *Plugin) onConnect() { + p.onnConnectMu.Lock() + defer p.onnConnectMu.Unlock() + for _, fn := range p.onConnects { + fn() + } +} + // handleVPPConnectionEvents handles VPP connection events. func (p *Plugin) handleVPPConnectionEvents(ctx context.Context) { defer p.wg.Done() @@ -356,6 +380,8 @@ func (p *Plugin) handleVPPConnectionEvents(ctx context.Context) { return } + p.Log.Debugf("VPP connection state changed: %+v", event) + if event.State == govpp.Connected { if err := p.updateVPPInfo(); err != nil { p.Log.Errorf("updating VPP info failed: %v", err) @@ -378,8 +404,15 @@ func (p *Plugin) handleVPPConnectionEvents(ctx context.Context) { p.lastConnErr = errors.Errorf("VPP connection lost (event: %+v)", event) p.StatusCheck.ReportStateChange(p.PluginName, statuscheck.Error, p.lastConnErr) + + // TODO: fix reconnecting after reaching maximum reconnect attempts + // current implementation wont work with already created govpp channels + // keep reconnecting + /*if event.State == govpp.Failed { + p.vppConn, p.vppConChan, _ = govpp.AsyncConnect(p.vppAdapter, p.config.RetryConnectCount, p.config.RetryConnectTimeout) + }*/ } else { - p.Log.Debugf("VPP connection state: %+v", event) + p.Log.Warnf("unknown VPP connection state: %+v", event) } p.infoMu.Lock() diff --git a/plugins/vpp/ifplugin/vppcalls/interface_handler_api.go b/plugins/vpp/ifplugin/vppcalls/interface_handler_api.go index 9efb22f141..ad43844583 100644 --- a/plugins/vpp/ifplugin/vppcalls/interface_handler_api.go +++ b/plugins/vpp/ifplugin/vppcalls/interface_handler_api.go @@ -269,9 +269,9 @@ type InterfaceVppRead interface { // DumpDhcpClients dumps DHCP-related information for all interfaces. DumpDhcpClients() (map[uint32]*Dhcp, error) // WatchInterfaceEvents starts watching for interface events. - WatchInterfaceEvents(ch chan<- *InterfaceEvent) error + WatchInterfaceEvents(ctx context.Context, events chan<- *InterfaceEvent) error // WatchDHCPLeases starts watching for DHCP leases. - WatchDHCPLeases(ch chan<- *Lease) error + WatchDHCPLeases(ctx context.Context, leases chan<- *Lease) error } var Handler = vpp.RegisterHandler(vpp.HandlerDesc{ diff --git a/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls.go b/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls.go index fee7d1c374..7e793d739f 100644 --- a/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls.go +++ b/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls.go @@ -16,6 +16,7 @@ package vpp1904 import ( "bytes" + "context" "fmt" "net" "os" @@ -24,63 +25,83 @@ import ( govppapi "git.fd.io/govpp.git/api" "github.com/pkg/errors" - "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1904/dhcp" - binapi_interfaces "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1904/interfaces" + vpp_dhcp "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1904/dhcp" + vpp_ifs "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1904/interfaces" "go.ligato.io/vpp-agent/v3/plugins/vpp/ifplugin/vppcalls" ) -var InterfaceEventTimeout = time.Second +var ( + // EventDeliverTimeout defines maximum time to deliver event upstream. + EventDeliverTimeout = time.Second + // NotifChanBufferSize defines size of notification channel buffer. + NotifChanBufferSize = 10 +) -func (h *InterfaceVppHandler) WatchInterfaceEvents(events chan<- *vppcalls.InterfaceEvent) error { - notifChan := make(chan govppapi.Message, 10) +func (h *InterfaceVppHandler) WatchInterfaceEvents(ctx context.Context, eventsCh chan<- *vppcalls.InterfaceEvent) error { + notifChan := make(chan govppapi.Message, NotifChanBufferSize) - // subscribe for receiving SwInterfaceEvents notifications - vppNotifSubs, err := h.callsChannel.SubscribeNotification(notifChan, &binapi_interfaces.SwInterfaceEvent{}) + // subscribe to SwInterfaceEvent notifications + sub, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_ifs.SwInterfaceEvent{}) if err != nil { - return errors.Errorf("failed to subscribe VPP notification (sw_interface_event): %v", err) + return errors.Errorf("subscribing to VPP notification (sw_interface_event) failed: %v", err) + } + unsub := func() { + if err := sub.Unsubscribe(); err != nil { + h.log.Warnf("unsubscribing VPP notification (sw_interface_event) failed: %v", err) + } } - _ = vppNotifSubs go func() { + h.log.Debugf("start watching interface events") + defer h.log.Debugf("done watching interface events (%v)", ctx.Err()) + for { select { - case e, ok := <-notifChan: - if !ok { - h.log.Debugf("interface notification channel was closed") + case e, open := <-notifChan: + if !open { + h.log.Debugf("interface events channel was closed") + unsub() return } - ifEvent, ok := e.(*binapi_interfaces.SwInterfaceEvent) + + ifEvent, ok := e.(*vpp_ifs.SwInterfaceEvent) if !ok { + h.log.Debugf("unexpected notification type: %#v", ifEvent) continue } - event := &vppcalls.InterfaceEvent{ - SwIfIndex: ifEvent.SwIfIndex, - AdminState: ifEvent.AdminUpDown, - LinkState: ifEvent.LinkUpDown, - Deleted: ifEvent.Deleted != 0, + + // try to send event + select { + case eventsCh <- toInterfaceEvent(ifEvent): + // sent ok + case <-ctx.Done(): + unsub() + return + default: + // channel full send event in goroutine for later processing + go func() { + select { + case eventsCh <- toInterfaceEvent(ifEvent): + // sent ok + case <-time.After(EventDeliverTimeout): + h.log.Warnf("unable to deliver interface event, dropping it: %+v", ifEvent) + } + }() } - // send event in goroutine for quick processing - go func() { - select { - case events <- event: - // sent ok - case <-time.After(InterfaceEventTimeout): - h.log.Warnf("unable to deliver interface event, dropping it") - } - }() + case <-ctx.Done(): + unsub() + return } } }() - // enable interface state notifications from VPP - wantIfEventsReply := &binapi_interfaces.WantInterfaceEventsReply{} - err = h.callsChannel.SendRequest(&binapi_interfaces.WantInterfaceEvents{ + // enable interface events from VPP + if _, err := h.interfaces.WantInterfaceEvents(ctx, &vpp_ifs.WantInterfaceEvents{ PID: uint32(os.Getpid()), EnableDisable: 1, - }).ReceiveReply(wantIfEventsReply) - if err != nil { - if err == govppapi.VPPApiError(govppapi.INVALID_REGISTRATION) { - h.log.Warnf("already registered for watch interface events: %v", err) + }); err != nil { + if errors.Is(err, govppapi.VPPApiError(govppapi.INVALID_REGISTRATION)) { + h.log.Warnf("already subscribed to interface events: %v", err) return nil } return errors.Errorf("failed to watch interface events: %v", err) @@ -89,45 +110,94 @@ func (h *InterfaceVppHandler) WatchInterfaceEvents(events chan<- *vppcalls.Inter return nil } -func (h *InterfaceVppHandler) WatchDHCPLeases(leasesCh chan<- *vppcalls.Lease) error { - notifChan := make(chan govppapi.Message) +func (h *InterfaceVppHandler) WatchDHCPLeases(ctx context.Context, leasesCh chan<- *vppcalls.Lease) error { + notifChan := make(chan govppapi.Message, NotifChanBufferSize) - // subscribe for receiving SwInterfaceEvents notifications - vppNotifSubs, err := h.callsChannel.SubscribeNotification(notifChan, &dhcp.DHCPComplEvent{}) + // subscribe for receiving DHCPComplEvent notifications + sub, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_dhcp.DHCPComplEvent{}) if err != nil { - return errors.Errorf("failed to subscribe VPP notification (sw_interface_event): %v", err) + return errors.Errorf("subscribing to VPP notification (dhcp_compl_event) failed: %v", err) + } + unsub := func() { + if err := sub.Unsubscribe(); err != nil { + h.log.Warnf("unsubscribing VPP notification (dhcp_compl_event) failed: %v", err) + } } - _ = vppNotifSubs go func() { + h.log.Debugf("start watching DHCP leases") + defer h.log.Debugf("done watching DHCP lease", ctx.Err()) + for { select { - case e := <-notifChan: - dhcpEvent, ok := e.(*dhcp.DHCPComplEvent) + case e, open := <-notifChan: + if !open { + h.log.Debugf("interface notification channel was closed") + unsub() + return + } + + dhcpEvent, ok := e.(*vpp_dhcp.DHCPComplEvent) if !ok { + h.log.Debugf("unexpected notification type: %#v", dhcpEvent) continue } - lease := dhcpEvent.Lease - var hostAddr, routerAddr string - if uintToBool(lease.IsIPv6) { - hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress).To16().String(), uint32(lease.MaskWidth)) - routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress).To16().String(), uint32(lease.MaskWidth)) - } else { - hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress[:4]).To4().String(), uint32(lease.MaskWidth)) - routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress[:4]).To4().String(), uint32(lease.MaskWidth)) - } - leasesCh <- &vppcalls.Lease{ - SwIfIndex: lease.SwIfIndex, - State: lease.State, - Hostname: string(bytes.SplitN(lease.Hostname, []byte{0x00}, 2)[0]), - IsIPv6: uintToBool(lease.IsIPv6), - HostAddress: hostAddr, - RouterAddress: routerAddr, - HostMac: net.HardwareAddr(lease.HostMac).String(), + + // try to send event + select { + case leasesCh <- toDHCPLease(dhcpEvent): + // sent ok + case <-ctx.Done(): + unsub() + return + default: + // channel full send event in goroutine for later processing + go func() { + select { + case leasesCh <- toDHCPLease(dhcpEvent): + // sent ok + case <-time.After(EventDeliverTimeout): + h.log.Warnf("unable to deliver DHCP lease event, dropping it: %+v", dhcpEvent) + } + }() } + case <-ctx.Done(): + unsub() + return } } }() return nil } + +func toInterfaceEvent(ifEvent *vpp_ifs.SwInterfaceEvent) *vppcalls.InterfaceEvent { + event := &vppcalls.InterfaceEvent{ + SwIfIndex: ifEvent.SwIfIndex, + AdminState: ifEvent.AdminUpDown, + LinkState: ifEvent.LinkUpDown, + Deleted: ifEvent.Deleted != 0, + } + return event +} + +func toDHCPLease(dhcpEvent *vpp_dhcp.DHCPComplEvent) *vppcalls.Lease { + lease := dhcpEvent.Lease + var hostAddr, routerAddr string + if uintToBool(lease.IsIPv6) { + hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress).To16().String(), uint32(lease.MaskWidth)) + routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress).To16().String(), uint32(lease.MaskWidth)) + } else { + hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress[:4]).To4().String(), uint32(lease.MaskWidth)) + routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress[:4]).To4().String(), uint32(lease.MaskWidth)) + } + return &vppcalls.Lease{ + SwIfIndex: lease.SwIfIndex, + State: lease.State, + Hostname: string(bytes.TrimRight(lease.Hostname, "\x00")), + IsIPv6: uintToBool(lease.IsIPv6), + HostAddress: hostAddr, + RouterAddress: routerAddr, + HostMac: net.HardwareAddr(lease.HostMac[:]).String(), + } +} diff --git a/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls_test.go b/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls_test.go index 6b3f83422d..aa82111703 100644 --- a/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls_test.go +++ b/plugins/vpp/ifplugin/vppcalls/vpp1904/watch_vppcalls_test.go @@ -17,12 +17,11 @@ package vpp1904_test import ( "testing" - "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1904/interfaces" + . "github.com/onsi/gomega" "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1904/dhcp" + "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1904/interfaces" "go.ligato.io/vpp-agent/v3/plugins/vpp/ifplugin/vppcalls" - - . "github.com/onsi/gomega" ) func TestWatchInterfaceEvents(t *testing.T) { @@ -30,7 +29,7 @@ func TestWatchInterfaceEvents(t *testing.T) { defer ctx.TeardownTestCtx() ctx.MockVpp.MockReply(&interfaces.WantInterfaceEventsReply{}) eventsChan := make(chan *vppcalls.InterfaceEvent) - err := ifHandler.WatchInterfaceEvents(eventsChan) + err := ifHandler.WatchInterfaceEvents(ctx.Context, eventsChan) notifChan := ctx.MockChannel.GetChannel() Expect(notifChan).ToNot(BeNil()) Expect(err).To(BeNil()) @@ -82,7 +81,7 @@ func TestWatchDHCPLeases(t *testing.T) { ctx, ifHandler := ifTestSetup(t) defer ctx.TeardownTestCtx() leasesChChan := make(chan *vppcalls.Lease) - err := ifHandler.WatchDHCPLeases(leasesChChan) + err := ifHandler.WatchDHCPLeases(ctx.Context, leasesChChan) notifChan := ctx.MockChannel.GetChannel() Expect(notifChan).ToNot(BeNil()) Expect(err).To(BeNil()) diff --git a/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls.go b/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls.go index 9974d4fbae..93e6a6a347 100644 --- a/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls.go +++ b/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls.go @@ -16,6 +16,7 @@ package vpp1908 import ( "bytes" + "context" "fmt" "net" "os" @@ -24,63 +25,83 @@ import ( govppapi "git.fd.io/govpp.git/api" "github.com/pkg/errors" - "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1908/dhcp" - binapi_interfaces "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1908/interfaces" + vpp_dhcp "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1908/dhcp" + vpp_ifs "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1908/interfaces" "go.ligato.io/vpp-agent/v3/plugins/vpp/ifplugin/vppcalls" ) -var InterfaceEventTimeout = time.Second +var ( + // EventDeliverTimeout defines maximum time to deliver event upstream. + EventDeliverTimeout = time.Second + // NotifChanBufferSize defines size of notification channel buffer. + NotifChanBufferSize = 10 +) -func (h *InterfaceVppHandler) WatchInterfaceEvents(events chan<- *vppcalls.InterfaceEvent) error { - notifChan := make(chan govppapi.Message, 10) +func (h *InterfaceVppHandler) WatchInterfaceEvents(ctx context.Context, eventsCh chan<- *vppcalls.InterfaceEvent) error { + notifChan := make(chan govppapi.Message, NotifChanBufferSize) - // subscribe for receiving SwInterfaceEvents notifications - vppNotifSubs, err := h.callsChannel.SubscribeNotification(notifChan, &binapi_interfaces.SwInterfaceEvent{}) + // subscribe to SwInterfaceEvent notifications + sub, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_ifs.SwInterfaceEvent{}) if err != nil { - return errors.Errorf("failed to subscribe VPP notification (sw_interface_event): %v", err) + return errors.Errorf("subscribing to VPP notification (sw_interface_event) failed: %v", err) + } + unsub := func() { + if err := sub.Unsubscribe(); err != nil { + h.log.Warnf("unsubscribing VPP notification (sw_interface_event) failed: %v", err) + } } - _ = vppNotifSubs go func() { + h.log.Debugf("start watching interface events") + defer h.log.Debugf("done watching interface events (%v)", ctx.Err()) + for { select { - case e, ok := <-notifChan: - if !ok { - h.log.Debugf("interface notification channel was closed") + case e, open := <-notifChan: + if !open { + h.log.Debugf("interface events channel was closed") + unsub() return } - ifEvent, ok := e.(*binapi_interfaces.SwInterfaceEvent) + + ifEvent, ok := e.(*vpp_ifs.SwInterfaceEvent) if !ok { + h.log.Debugf("unexpected notification type: %#v", ifEvent) continue } - event := &vppcalls.InterfaceEvent{ - SwIfIndex: ifEvent.SwIfIndex, - AdminState: ifEvent.AdminUpDown, - LinkState: ifEvent.LinkUpDown, - Deleted: ifEvent.Deleted != 0, + + // try to send event + select { + case eventsCh <- toInterfaceEvent(ifEvent): + // sent ok + case <-ctx.Done(): + unsub() + return + default: + // channel full send event in goroutine for later processing + go func() { + select { + case eventsCh <- toInterfaceEvent(ifEvent): + // sent ok + case <-time.After(EventDeliverTimeout): + h.log.Warnf("unable to deliver interface event, dropping it: %+v", ifEvent) + } + }() } - // send event in goroutine for quick processing - go func() { - select { - case events <- event: - // sent ok - case <-time.After(InterfaceEventTimeout): - h.log.Warnf("unable to deliver interface event, dropping it") - } - }() + case <-ctx.Done(): + unsub() + return } } }() - // enable interface state notifications from VPP - wantIfEventsReply := &binapi_interfaces.WantInterfaceEventsReply{} - err = h.callsChannel.SendRequest(&binapi_interfaces.WantInterfaceEvents{ + // enable interface events from VPP + if _, err := h.interfaces.WantInterfaceEvents(ctx, &vpp_ifs.WantInterfaceEvents{ PID: uint32(os.Getpid()), EnableDisable: 1, - }).ReceiveReply(wantIfEventsReply) - if err != nil { - if err == govppapi.VPPApiError(govppapi.INVALID_REGISTRATION) { - h.log.Warnf("already registered for watch interface events: %v", err) + }); err != nil { + if errors.Is(err, govppapi.VPPApiError(govppapi.INVALID_REGISTRATION)) { + h.log.Warnf("already subscribed to interface events: %v", err) return nil } return errors.Errorf("failed to watch interface events: %v", err) @@ -89,45 +110,94 @@ func (h *InterfaceVppHandler) WatchInterfaceEvents(events chan<- *vppcalls.Inter return nil } -func (h *InterfaceVppHandler) WatchDHCPLeases(leasesCh chan<- *vppcalls.Lease) error { - notifChan := make(chan govppapi.Message) +func (h *InterfaceVppHandler) WatchDHCPLeases(ctx context.Context, leasesCh chan<- *vppcalls.Lease) error { + notifChan := make(chan govppapi.Message, NotifChanBufferSize) - // subscribe for receiving SwInterfaceEvents notifications - vppNotifSubs, err := h.callsChannel.SubscribeNotification(notifChan, &dhcp.DHCPComplEvent{}) + // subscribe for receiving DHCPComplEvent notifications + sub, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_dhcp.DHCPComplEvent{}) if err != nil { - return errors.Errorf("failed to subscribe VPP notification (sw_interface_event): %v", err) + return errors.Errorf("subscribing to VPP notification (dhcp_compl_event) failed: %v", err) + } + unsub := func() { + if err := sub.Unsubscribe(); err != nil { + h.log.Warnf("unsubscribing VPP notification (dhcp_compl_event) failed: %v", err) + } } - _ = vppNotifSubs go func() { + h.log.Debugf("start watching DHCP leases") + defer h.log.Debugf("done watching DHCP lease", ctx.Err()) + for { select { - case e := <-notifChan: - dhcpEvent, ok := e.(*dhcp.DHCPComplEvent) + case e, open := <-notifChan: + if !open { + h.log.Debugf("interface notification channel was closed") + unsub() + return + } + + dhcpEvent, ok := e.(*vpp_dhcp.DHCPComplEvent) if !ok { + h.log.Debugf("unexpected notification type: %#v", dhcpEvent) continue } - lease := dhcpEvent.Lease - var hostAddr, routerAddr string - if uintToBool(lease.IsIPv6) { - hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress).To16().String(), uint32(lease.MaskWidth)) - routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress).To16().String(), uint32(lease.MaskWidth)) - } else { - hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress[:4]).To4().String(), uint32(lease.MaskWidth)) - routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress[:4]).To4().String(), uint32(lease.MaskWidth)) - } - leasesCh <- &vppcalls.Lease{ - SwIfIndex: lease.SwIfIndex, - State: lease.State, - Hostname: string(bytes.SplitN(lease.Hostname, []byte{0x00}, 2)[0]), - IsIPv6: uintToBool(lease.IsIPv6), - HostAddress: hostAddr, - RouterAddress: routerAddr, - HostMac: net.HardwareAddr(lease.HostMac).String(), + + // try to send event + select { + case leasesCh <- toDHCPLease(dhcpEvent): + // sent ok + case <-ctx.Done(): + unsub() + return + default: + // channel full send event in goroutine for later processing + go func() { + select { + case leasesCh <- toDHCPLease(dhcpEvent): + // sent ok + case <-time.After(EventDeliverTimeout): + h.log.Warnf("unable to deliver DHCP lease event, dropping it: %+v", dhcpEvent) + } + }() } + case <-ctx.Done(): + unsub() + return } } }() return nil } + +func toInterfaceEvent(ifEvent *vpp_ifs.SwInterfaceEvent) *vppcalls.InterfaceEvent { + event := &vppcalls.InterfaceEvent{ + SwIfIndex: ifEvent.SwIfIndex, + AdminState: ifEvent.AdminUpDown, + LinkState: ifEvent.LinkUpDown, + Deleted: ifEvent.Deleted != 0, + } + return event +} + +func toDHCPLease(dhcpEvent *vpp_dhcp.DHCPComplEvent) *vppcalls.Lease { + lease := dhcpEvent.Lease + var hostAddr, routerAddr string + if uintToBool(lease.IsIPv6) { + hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress).To16().String(), uint32(lease.MaskWidth)) + routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress).To16().String(), uint32(lease.MaskWidth)) + } else { + hostAddr = fmt.Sprintf("%s/%d", net.IP(lease.HostAddress[:4]).To4().String(), uint32(lease.MaskWidth)) + routerAddr = fmt.Sprintf("%s/%d", net.IP(lease.RouterAddress[:4]).To4().String(), uint32(lease.MaskWidth)) + } + return &vppcalls.Lease{ + SwIfIndex: lease.SwIfIndex, + State: lease.State, + Hostname: string(bytes.TrimRight(lease.Hostname, "\x00")), + IsIPv6: uintToBool(lease.IsIPv6), + HostAddress: hostAddr, + RouterAddress: routerAddr, + HostMac: net.HardwareAddr(lease.HostMac[:]).String(), + } +} diff --git a/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls_test.go b/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls_test.go index d147b0f73d..74c144f307 100644 --- a/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls_test.go +++ b/plugins/vpp/ifplugin/vppcalls/vpp1908/watch_vppcalls_test.go @@ -17,12 +17,11 @@ package vpp1908_test import ( "testing" - "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1908/interfaces" + . "github.com/onsi/gomega" "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1908/dhcp" + "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp1908/interfaces" "go.ligato.io/vpp-agent/v3/plugins/vpp/ifplugin/vppcalls" - - . "github.com/onsi/gomega" ) func TestWatchInterfaceEvents(t *testing.T) { @@ -30,7 +29,7 @@ func TestWatchInterfaceEvents(t *testing.T) { defer ctx.TeardownTestCtx() ctx.MockVpp.MockReply(&interfaces.WantInterfaceEventsReply{}) eventsChan := make(chan *vppcalls.InterfaceEvent) - err := ifHandler.WatchInterfaceEvents(eventsChan) + err := ifHandler.WatchInterfaceEvents(ctx.Context, eventsChan) notifChan := ctx.MockChannel.GetChannel() Expect(notifChan).ToNot(BeNil()) Expect(err).To(BeNil()) @@ -82,7 +81,7 @@ func TestWatchDHCPLeases(t *testing.T) { ctx, ifHandler := ifTestSetup(t) defer ctx.TeardownTestCtx() leasesChChan := make(chan *vppcalls.Lease) - err := ifHandler.WatchDHCPLeases(leasesChChan) + err := ifHandler.WatchDHCPLeases(ctx.Context, leasesChChan) notifChan := ctx.MockChannel.GetChannel() Expect(notifChan).ToNot(BeNil()) Expect(err).To(BeNil()) diff --git a/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls.go b/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls.go index 48fffcb60f..9fd98ee599 100644 --- a/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls.go +++ b/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls.go @@ -15,6 +15,7 @@ package vpp2001 import ( + "context" "net" "os" "strings" @@ -24,62 +25,83 @@ import ( "github.com/pkg/errors" vpp_dhcp "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp2001/dhcp" + "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp2001/interface_types" vpp_ifs "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp2001/interfaces" "go.ligato.io/vpp-agent/v3/plugins/vpp/ifplugin/vppcalls" ) -var InterfaceEventTimeout = time.Second +var ( + // EventDeliverTimeout defines maximum time to deliver event upstream. + EventDeliverTimeout = time.Second + // NotifChanBufferSize defines size of notification channel buffer. + NotifChanBufferSize = 10 +) -func (h *InterfaceVppHandler) WatchInterfaceEvents(events chan<- *vppcalls.InterfaceEvent) error { - notifChan := make(chan govppapi.Message, 10) +func (h *InterfaceVppHandler) WatchInterfaceEvents(ctx context.Context, eventsCh chan<- *vppcalls.InterfaceEvent) error { + notifChan := make(chan govppapi.Message, NotifChanBufferSize) - // subscribe for receiving SwInterfaceEvents notifications - vppNotifSubs, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_ifs.SwInterfaceEvent{}) + // subscribe to SwInterfaceEvent notifications + sub, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_ifs.SwInterfaceEvent{}) if err != nil { - return errors.Errorf("failed to subscribe VPP notification (sw_interface_event): %v", err) + return errors.Errorf("subscribing to VPP notification (sw_interface_event) failed: %v", err) + } + unsub := func() { + if err := sub.Unsubscribe(); err != nil { + h.log.Warnf("unsubscribing VPP notification (sw_interface_event) failed: %v", err) + } } - _ = vppNotifSubs go func() { + h.log.Debugf("start watching interface events") + defer h.log.Debugf("done watching interface events (%v)", ctx.Err()) + for { select { - case e, ok := <-notifChan: - if !ok { - h.log.Debugf("interface notification channel was closed") + case e, open := <-notifChan: + if !open { + h.log.Debugf("interface events channel was closed") + unsub() return } + ifEvent, ok := e.(*vpp_ifs.SwInterfaceEvent) if !ok { + h.log.Debugf("unexpected notification type: %#v", ifEvent) continue } - event := &vppcalls.InterfaceEvent{ - SwIfIndex: uint32(ifEvent.SwIfIndex), - AdminState: boolToUint(ifEvent.Flags > 0), - LinkState: boolToUint(ifEvent.Flags > 1), - Deleted: ifEvent.Deleted, + + // try to send event + select { + case eventsCh <- toInterfaceEvent(ifEvent): + // sent ok + case <-ctx.Done(): + unsub() + return + default: + // channel full send event in goroutine for later processing + go func() { + select { + case eventsCh <- toInterfaceEvent(ifEvent): + // sent ok + case <-time.After(EventDeliverTimeout): + h.log.Warnf("unable to deliver interface event, dropping it: %+v", ifEvent) + } + }() } - // send event in goroutine for quick processing - go func() { - select { - case events <- event: - // sent ok - case <-time.After(InterfaceEventTimeout): - h.log.Warnf("unable to deliver interface event, dropping it") - } - }() + case <-ctx.Done(): + unsub() + return } } }() - // enable interface state notifications from VPP - wantIfEventsReply := &vpp_ifs.WantInterfaceEventsReply{} - err = h.callsChannel.SendRequest(&vpp_ifs.WantInterfaceEvents{ + // enable interface events from VPP + if _, err := h.interfaces.WantInterfaceEvents(ctx, &vpp_ifs.WantInterfaceEvents{ PID: uint32(os.Getpid()), EnableDisable: 1, - }).ReceiveReply(wantIfEventsReply) - if err != nil { - if err == govppapi.VPPApiError(govppapi.INVALID_REGISTRATION) { - h.log.Warnf("already registered for watch interface events: %v", err) + }); err != nil { + if errors.Is(err, govppapi.VPPApiError(govppapi.INVALID_REGISTRATION)) { + h.log.Warnf("already subscribed to interface events: %v", err) return nil } return errors.Errorf("failed to watch interface events: %v", err) @@ -88,37 +110,90 @@ func (h *InterfaceVppHandler) WatchInterfaceEvents(events chan<- *vppcalls.Inter return nil } -func (h *InterfaceVppHandler) WatchDHCPLeases(leasesCh chan<- *vppcalls.Lease) error { - notifChan := make(chan govppapi.Message) +func (h *InterfaceVppHandler) WatchDHCPLeases(ctx context.Context, leasesCh chan<- *vppcalls.Lease) error { + notifChan := make(chan govppapi.Message, NotifChanBufferSize) - // subscribe for receiving SwInterfaceEvents notifications - vppNotifSubs, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_dhcp.DHCPComplEvent{}) + // subscribe for receiving DHCPComplEvent notifications + sub, err := h.callsChannel.SubscribeNotification(notifChan, &vpp_dhcp.DHCPComplEvent{}) if err != nil { - return errors.Errorf("failed to subscribe VPP notification (sw_interface_event): %v", err) + return errors.Errorf("subscribing to VPP notification (dhcp_compl_event) failed: %v", err) + } + unsub := func() { + if err := sub.Unsubscribe(); err != nil { + h.log.Warnf("unsubscribing VPP notification (dhcp_compl_event) failed: %v", err) + } } - _ = vppNotifSubs go func() { + h.log.Debugf("start watching DHCP leases") + defer h.log.Debugf("done watching DHCP lease", ctx.Err()) + for { select { - case e := <-notifChan: + case e, open := <-notifChan: + if !open { + h.log.Debugf("interface notification channel was closed") + unsub() + return + } + dhcpEvent, ok := e.(*vpp_dhcp.DHCPComplEvent) if !ok { + h.log.Debugf("unexpected notification type: %#v", dhcpEvent) continue } - lease := dhcpEvent.Lease - leasesCh <- &vppcalls.Lease{ - SwIfIndex: uint32(lease.SwIfIndex), - State: uint8(lease.State), - Hostname: strings.TrimRight(lease.Hostname, "\x00"), - IsIPv6: lease.IsIPv6, - HostAddress: dhcpAddressToString(lease.HostAddress, uint32(lease.MaskWidth), lease.IsIPv6), - RouterAddress: dhcpAddressToString(lease.RouterAddress, uint32(lease.MaskWidth), lease.IsIPv6), - HostMac: net.HardwareAddr(lease.HostMac[:]).String(), + + // try to send event + select { + case leasesCh <- toDHCPLease(dhcpEvent): + // sent ok + case <-ctx.Done(): + unsub() + return + default: + // channel full send event in goroutine for later processing + go func() { + select { + case leasesCh <- toDHCPLease(dhcpEvent): + // sent ok + case <-time.After(EventDeliverTimeout): + h.log.Warnf("unable to deliver DHCP lease event, dropping it: %+v", dhcpEvent) + } + }() } + case <-ctx.Done(): + unsub() + return } } }() return nil } + +func toInterfaceEvent(ifEvent *vpp_ifs.SwInterfaceEvent) *vppcalls.InterfaceEvent { + event := &vppcalls.InterfaceEvent{ + SwIfIndex: uint32(ifEvent.SwIfIndex), + Deleted: ifEvent.Deleted, + } + if ifEvent.Flags&interface_types.IF_STATUS_API_FLAG_ADMIN_UP == interface_types.IF_STATUS_API_FLAG_ADMIN_UP { + event.AdminState = 1 + } + if ifEvent.Flags&interface_types.IF_STATUS_API_FLAG_LINK_UP == interface_types.IF_STATUS_API_FLAG_LINK_UP { + event.LinkState = 1 + } + return event +} + +func toDHCPLease(dhcpEvent *vpp_dhcp.DHCPComplEvent) *vppcalls.Lease { + lease := dhcpEvent.Lease + return &vppcalls.Lease{ + SwIfIndex: uint32(lease.SwIfIndex), + State: uint8(lease.State), + Hostname: strings.TrimRight(lease.Hostname, "\x00"), + IsIPv6: lease.IsIPv6, + HostAddress: dhcpAddressToString(lease.HostAddress, uint32(lease.MaskWidth), lease.IsIPv6), + RouterAddress: dhcpAddressToString(lease.RouterAddress, uint32(lease.MaskWidth), lease.IsIPv6), + HostMac: net.HardwareAddr(lease.HostMac[:]).String(), + } +} diff --git a/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls_test.go b/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls_test.go index 2c89ea0090..40e707446e 100644 --- a/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls_test.go +++ b/plugins/vpp/ifplugin/vppcalls/vpp2001/watch_vppcalls_test.go @@ -18,12 +18,11 @@ import ( "net" "testing" - "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp2001/interfaces" + . "github.com/onsi/gomega" "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp2001/dhcp" + "go.ligato.io/vpp-agent/v3/plugins/vpp/binapi/vpp2001/interfaces" "go.ligato.io/vpp-agent/v3/plugins/vpp/ifplugin/vppcalls" - - . "github.com/onsi/gomega" ) func TestWatchInterfaceEvents(t *testing.T) { @@ -31,14 +30,14 @@ func TestWatchInterfaceEvents(t *testing.T) { defer ctx.TeardownTestCtx() ctx.MockVpp.MockReply(&interfaces.WantInterfaceEventsReply{}) eventsChan := make(chan *vppcalls.InterfaceEvent) - err := ifHandler.WatchInterfaceEvents(eventsChan) + err := ifHandler.WatchInterfaceEvents(ctx.Context, eventsChan) notifChan := ctx.MockChannel.GetChannel() Expect(notifChan).ToNot(BeNil()) Expect(err).To(BeNil()) notifChan <- &interfaces.SwInterfaceEvent{ SwIfIndex: 1, - Flags: 2, + Flags: 3, Deleted: true, } var result *vppcalls.InterfaceEvent @@ -61,7 +60,7 @@ func TestWatchInterfaceEvents(t *testing.T) { notifChan <- &interfaces.SwInterfaceEvent{ SwIfIndex: 3, - Flags: 2, + Flags: 3, Deleted: false, } result = &vppcalls.InterfaceEvent{} @@ -80,7 +79,7 @@ func TestWatchDHCPLeases(t *testing.T) { ctx, ifHandler := ifTestSetup(t) defer ctx.TeardownTestCtx() leasesChChan := make(chan *vppcalls.Lease) - err := ifHandler.WatchDHCPLeases(leasesChChan) + err := ifHandler.WatchDHCPLeases(ctx.Context, leasesChChan) notifChan := ctx.MockChannel.GetChannel() Expect(notifChan).ToNot(BeNil()) Expect(err).To(BeNil()) From 6ca4cf8b1a026e6c4e60a55f7754a8d02b269bb2 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Fri, 13 Mar 2020 12:44:13 +0100 Subject: [PATCH 3/3] Add OnReconnect method to vpp.Client interface Signed-off-by: Ondrej Fabry --- plugins/vpp/ifplugin/descriptor/dhcp.go | 6 +-- plugins/vpp/ifplugin/descriptor/interface.go | 3 +- .../vpp/ifplugin/descriptor/interface_crud.go | 4 +- .../vpp/ifplugin/descriptor/interface_vrf.go | 5 +- plugins/vpp/ifplugin/ifplugin.go | 4 +- plugins/vpp/ifplugin/interface_state.go | 49 +++++++++---------- plugins/vpp/vpp.go | 2 + plugins/vpp/vppmock/vppmock.go | 12 ++++- tests/integration/vpp/integration_test.go | 10 ++-- 9 files changed, 52 insertions(+), 43 deletions(-) diff --git a/plugins/vpp/ifplugin/descriptor/dhcp.go b/plugins/vpp/ifplugin/descriptor/dhcp.go index 357205b229..ca60d773fc 100644 --- a/plugins/vpp/ifplugin/descriptor/dhcp.go +++ b/plugins/vpp/ifplugin/descriptor/dhcp.go @@ -218,8 +218,8 @@ func (d *DHCPDescriptor) watchDHCPNotifications(ctx context.Context) { defer d.wg.Done() d.log.Debug("Started watcher on DHCP notifications") - dhcpChan := make(chan *vppcalls.Lease) - if err := d.ifHandler.WatchDHCPLeases(dhcpChan); err != nil { + dhcpChan := make(chan *vppcalls.Lease, 10) + if err := d.ifHandler.WatchDHCPLeases(ctx, dhcpChan); err != nil { d.log.Errorf("watching dhcp leases failed: %v", err) return } @@ -227,7 +227,7 @@ func (d *DHCPDescriptor) watchDHCPNotifications(ctx context.Context) { for { select { case lease := <-dhcpChan: - // interface logical name + // Get interface logical name ifName, _, found := d.ifIndex.LookupBySwIfIndex(lease.SwIfIndex) if !found { d.log.Warnf("Interface sw_if_index=%d with DHCP lease was not found in the mapping", lease.SwIfIndex) diff --git a/plugins/vpp/ifplugin/descriptor/interface.go b/plugins/vpp/ifplugin/descriptor/interface.go index a93b2fe6fc..8c37891afa 100644 --- a/plugins/vpp/ifplugin/descriptor/interface.go +++ b/plugins/vpp/ifplugin/descriptor/interface.go @@ -293,8 +293,7 @@ func (d *InterfaceDescriptor) EquivalentInterfaces(key string, oldIntf, newIntf } // compare MAC addresses case-insensitively (also handle unspecified MAC address) - if newIntf.PhysAddress != "" && - strings.ToLower(oldIntf.PhysAddress) != strings.ToLower(newIntf.PhysAddress) { + if newIntf.PhysAddress != "" && !strings.EqualFold(oldIntf.PhysAddress, newIntf.PhysAddress) { return false } diff --git a/plugins/vpp/ifplugin/descriptor/interface_crud.go b/plugins/vpp/ifplugin/descriptor/interface_crud.go index bc101559e7..5b48b0a668 100644 --- a/plugins/vpp/ifplugin/descriptor/interface_crud.go +++ b/plugins/vpp/ifplugin/descriptor/interface_crud.go @@ -4,6 +4,7 @@ import ( "context" "github.com/pkg/errors" + "go.ligato.io/vpp-agent/v3/plugins/vpp" "go.ligato.io/vpp-agent/v3/pkg/models" @@ -61,8 +62,9 @@ func (d *InterfaceDescriptor) Create(key string, intf *interfaces.Interface) (me d.log.Error(err) return nil, err } - ifIdx, err = d.ifHandler.AddMemifInterface(context.TODO(), intf.Name, intf.GetMemif(), socketID) + ifIdx, err = d.ifHandler.AddMemifInterface(ctx, intf.Name, intf.GetMemif(), socketID) if err != nil { + err := errors.WithMessagef(err, "adding memif interface %s (socketID: %d) failed", intf.Name, socketID) d.log.Error(err) return nil, err } diff --git a/plugins/vpp/ifplugin/descriptor/interface_vrf.go b/plugins/vpp/ifplugin/descriptor/interface_vrf.go index dcc7c1a44c..9ca1925354 100644 --- a/plugins/vpp/ifplugin/descriptor/interface_vrf.go +++ b/plugins/vpp/ifplugin/descriptor/interface_vrf.go @@ -70,10 +70,7 @@ func (d *InterfaceVrfDescriptor) IsInterfaceVrfKey(key string) bool { return true } _, _, isIfaceInherVrfKey := interfaces.ParseInterfaceInheritedVrfKey(key) - if isIfaceInherVrfKey { - return true - } - return false + return isIfaceInherVrfKey } // Create puts interface into the given VRF table. diff --git a/plugins/vpp/ifplugin/ifplugin.go b/plugins/vpp/ifplugin/ifplugin.go index 412038fbb8..81dd9a2d4f 100644 --- a/plugins/vpp/ifplugin/ifplugin.go +++ b/plugins/vpp/ifplugin/ifplugin.go @@ -76,7 +76,6 @@ type IfPlugin struct { // descriptors linkStateDescriptor *descriptor.LinkStateDescriptor dhcpDescriptor *descriptor.DHCPDescriptor - spanDescriptor *descriptor.SpanDescriptor // from config file defaultMtu uint32 @@ -243,8 +242,7 @@ func (p *IfPlugin) Init() (err error) { } } - err = p.ifStateUpdater.Init(p.ctx, p.Log, p.KVScheduler, p.VPP, p.intfIndex, - ifNotifHandler, p.publishStats) + err = p.ifStateUpdater.Init(p.ctx, p.Log, p.KVScheduler, p.VPP, p.intfIndex, ifNotifHandler, p.publishStats) if err != nil { return err } diff --git a/plugins/vpp/ifplugin/interface_state.go b/plugins/vpp/ifplugin/interface_state.go index 1a53a14e16..8f5538932e 100644 --- a/plugins/vpp/ifplugin/interface_state.go +++ b/plugins/vpp/ifplugin/interface_state.go @@ -50,8 +50,9 @@ type InterfaceStateUpdater struct { ifMetaChan chan ifaceidx.IfaceMetadataDto - ifHandler vppcalls.InterfaceVppAPI - ifEvents chan *vppcalls.InterfaceEvent + ifHandler vppcalls.InterfaceVppAPI + ifEvents chan *vppcalls.InterfaceEvent + cancelIfEvents func() ifsForUpdate map[uint32]struct{} lastIfCounters map[uint32]govppapi.InterfaceCounters @@ -60,6 +61,7 @@ type InterfaceStateUpdater struct { lastIfNotif time.Time lastIfMeta time.Time + ctx context.Context cancel context.CancelFunc // cancel can be used to cancel all goroutines and their jobs inside of the plugin wg sync.WaitGroup // wait group that allows to wait until all goroutines of the plugin have finished } @@ -96,26 +98,25 @@ func (c *InterfaceStateUpdater) Init( c.ifEvents = make(chan *vppcalls.InterfaceEvent, 1000) // Create child context - var childCtx context.Context - childCtx, c.cancel = context.WithCancel(ctx) + c.ctx, c.cancel = context.WithCancel(ctx) // Watch for incoming notifications c.wg.Add(1) - go c.watchVPPNotifications(childCtx) + go c.watchVPPNotifications(c.ctx) // Periodically read VPP counters and combined counters for VPP statistics if disableInterfaceStats { c.log.Warnf("reading interface stats is DISABLED!") } else if readCounters { c.wg.Add(1) - go c.startReadingCounters(childCtx) + go c.startReadingCounters(c.ctx) } if disableStatusPublishing { c.log.Warnf("publishing interface status is DISABLED!") } else { c.wg.Add(1) - go c.startUpdatingIfStateDetails(childCtx) + go c.startUpdatingIfStateDetails(c.ctx) } return nil @@ -123,19 +124,15 @@ func (c *InterfaceStateUpdater) Init( // AfterInit subscribes for watching VPP notifications on previously initialized channel func (c *InterfaceStateUpdater) AfterInit() error { - err := c.subscribeVPPNotifications() - if err != nil { + if err := c.subscribeVPPNotifications(c.ctx); err != nil { return err } - return nil -} - -// subscribeVPPNotifications subscribes for interface state notifications from VPP. -func (c *InterfaceStateUpdater) subscribeVPPNotifications() error { - if err := c.ifHandler.WatchInterfaceEvents(c.ifEvents); err != nil { - return err - } - + c.vppClient.OnReconnect(func() { + c.cancelIfEvents() + if err := c.subscribeVPPNotifications(c.ctx); err != nil { + c.log.Warnf("WatchInterfaceEvents failed: %v", err) + } + }) return nil } @@ -143,14 +140,15 @@ func (c *InterfaceStateUpdater) subscribeVPPNotifications() error { func (c *InterfaceStateUpdater) Close() error { c.cancel() c.wg.Wait() + return nil +} - // TODO: handle unsubscribing - /*if c.vppNotifSubs != nil { - if err := c.vppNotifSubs.Unsubscribe(); err != nil { - return errors.Errorf("failed to unsubscribe interface state notification on close: %v", err) - } - }*/ - +// subscribeVPPNotifications subscribes for interface state notifications from VPP. +func (c *InterfaceStateUpdater) subscribeVPPNotifications(ctx context.Context) error { + ctx, c.cancelIfEvents = context.WithCancel(ctx) + if err := c.ifHandler.WatchInterfaceEvents(ctx, c.ifEvents); err != nil { + return err + } return nil } @@ -387,7 +385,6 @@ func (c *InterfaceStateUpdater) getIfStateDataWLookup(ifIdx uint32) (*intf.Inter Name: ifName, Statistics: &intf.InterfaceState_Statistics{}, } - c.ifState[ifIdx] = ifState found = true } diff --git a/plugins/vpp/vpp.go b/plugins/vpp/vpp.go index 85e0f5a71f..cb6b4b73a3 100644 --- a/plugins/vpp/vpp.go +++ b/plugins/vpp/vpp.go @@ -45,4 +45,6 @@ type Client interface { IsPluginLoaded(plugin string) bool // BinapiVersion returns preferred binapi version. BinapiVersion() Version + // OnReconnect registers handler function to be executed on reconnect. + OnReconnect(h func()) } diff --git a/plugins/vpp/vppmock/vppmock.go b/plugins/vpp/vppmock/vppmock.go index 8c950938c8..393e2ddc62 100644 --- a/plugins/vpp/vppmock/vppmock.go +++ b/plugins/vpp/vppmock/vppmock.go @@ -113,13 +113,19 @@ func (m *mockedChannel) SendMultiRequest(msg govppapi.Message) govppapi.MultiReq func (m *mockedChannel) SubscribeNotification(notifChan chan govppapi.Message, event govppapi.Message) (govppapi.SubscriptionCtx, error) { m.channel = notifChan - return nil, nil + return &mockSubscription{}, nil } func (m *mockedChannel) GetChannel() chan govppapi.Message { return m.channel } +type mockSubscription struct{} + +func (s *mockSubscription) Unsubscribe() error { + return nil +} + type mockedContext struct { requestCtx govppapi.RequestCtx retErr error @@ -241,3 +247,7 @@ func (m *mockVPPClient) BinapiVersion() vpp.Version { func (m *mockVPPClient) Stats() govppapi.StatsProvider { panic("implement me") } + +func (m *mockVPPClient) OnReconnect(h func()) { + panic("implement me") +} diff --git a/tests/integration/vpp/integration_test.go b/tests/integration/vpp/integration_test.go index 1b418ecbd0..292efa4154 100644 --- a/tests/integration/vpp/integration_test.go +++ b/tests/integration/vpp/integration_test.go @@ -297,11 +297,11 @@ type vppClient struct { version vpp.Version } -func (m *vppClient) Version() vpp.Version { - return m.version +func (v *vppClient) Version() vpp.Version { + return v.version } -func (m *vppClient) BinapiVersion() vpp.Version { +func (v *vppClient) BinapiVersion() vpp.Version { return "" } @@ -326,3 +326,7 @@ func (v *vppClient) IsPluginLoaded(plugin string) bool { } return false } + +func (v *vppClient) OnReconnect(h func()) { + // no-op +}