Skip to content

Commit

Permalink
Trace refactored (#116)
Browse files Browse the repository at this point in the history
* govpp trace refactored

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* fix test

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* simplify

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* some fixes

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* init records list out of the lock

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* Added succeed/fail flag to traced message

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* decrease the size of the trace prealocation

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* make it safe to init trace during api calls

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* Do not use trace lock if trace is disabled

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

* fix data race on trace nil check

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>

---------

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Co-authored-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
  • Loading branch information
VladoLavor and sknat committed Feb 22, 2024
1 parent e9c9d31 commit 553f5ca
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 188 deletions.
21 changes: 11 additions & 10 deletions api/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,30 @@ import (
"time"
)

// Trace gives access to the API trace tool, capturing outcoming and incoming messages
// to and from GoVPP.
// Trace is the GoVPP utility tool capturing processed API messages. The trace is not operational
// by default.
// Enable trace for a given connection by calling `NewTrace(connection, size)`
type Trace interface {
// Enable allows to enable or disable API trace for a connection.
Enable(enable bool)

// GetRecords retrieves all messages collected (from all channels if they are used)
// since the point the trace was enabled or cleared.
// GetRecords returns all API messages from all channels captured since the trace
// was initialized or cleared up to the point of the call.
GetRecords() []*Record

// GetRecordsForChannel retrieves messages collected by the given channel since
// the point the trace was enabled or cleared.
// GetRecordsForChannel returns all API messages recorded by the given channel.
GetRecordsForChannel(chId uint16) []*Record

// Clear erases messages captured so far.
Clear()

// Close the tracer and release associated resources
Close()
}

// Record contains essential information about traced message, its timestamp and whether
// Record contains essential information about the traced message, its timestamp and whether
// the message was received or sent
type Record struct {
Message Message
Timestamp time.Time
IsReceived bool
ChannelID uint16
Succeeded bool
}
1 change: 0 additions & 1 deletion core/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func (req *requestCtx) ReceiveReply(msg api.Message) error {
} else if lastReplyReceived {
return errors.New("multipart reply recieved while a single reply expected")
}

return nil
}

Expand Down
30 changes: 3 additions & 27 deletions core/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type Connection struct {
msgControlPing api.Message
msgControlPingReply api.Message

apiTrace *trace // API tracer (disabled by default)
traceLock sync.Mutex
trace *Trace // API tracer (disabled by default)
}

type backgroundLoopStatus int
Expand Down Expand Up @@ -168,11 +169,7 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration,
subscriptions: make(map[uint16][]*subscriptionCtx),
msgControlPing: msgControlPing,
msgControlPingReply: msgControlPingReply,
apiTrace: &trace{
list: make([]*api.Record, 0),
mux: &sync.Mutex{},
},
channelIdPool: newIDPool(0x7fff),
channelIdPool: newIDPool(0x7fff),
}
c.channelPool = genericpool.New[*Channel](func() *Channel {
if isDebugOn(debugOptChannels) {
Expand Down Expand Up @@ -583,24 +580,3 @@ func (c *Connection) sendConnEvent(event ConnectionEvent) {
log.Warn("Connection state channel is full, discarding value.")
}
}

// Trace gives access to the API trace interface
func (c *Connection) Trace() api.Trace {
return c.apiTrace
}

// trace records api message
func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) {
if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 {
return
}
entry := &api.Record{
Message: msg,
Timestamp: t,
IsReceived: isReceived,
ChannelID: chId,
}
c.apiTrace.mux.Lock()
c.apiTrace.list = append(c.apiTrace.list, entry)
c.apiTrace.mux.Unlock()
}
57 changes: 47 additions & 10 deletions core/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,21 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
}

// send the request to VPP
t := time.Now()
err = c.vppClient.SendMsg(context, data)
if err != nil {
if err = func() (err error) {
timestamp, enabled := c.trace.registerNew()
err = c.vppClient.SendMsg(context, data)
if enabled {
c.traceLock.Lock()
defer c.traceLock.Unlock()
c.trace.send(&api.Record{
Message: req.msg,
Timestamp: timestamp,
ChannelID: ch.id,
Succeeded: err == nil,
})
}
return
}(); err != nil {
log.WithFields(logger.Fields{
"channel": ch.id,
"msg_id": msgID,
Expand All @@ -126,7 +138,6 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
}).Warnf("Unable to send message")
return err
}
c.trace(req.msg, ch.id, t, false)

if req.multi {
// send a control ping to determine end of the multipart response
Expand All @@ -143,16 +154,28 @@ func (c *Connection) processRequest(ch *Channel, req *vppRequest) error {
"data_len": len(pingData),
}).Debugf(" -> SEND MSG: %T", c.msgControlPing)
}

t = time.Now()
if err := c.vppClient.SendMsg(context, pingData); err != nil {
// send the control ping request to VPP
if err = func() (err error) {
timestamp, enabled := c.trace.registerNew()
err = c.vppClient.SendMsg(context, pingData)
if enabled {
c.traceLock.Lock()
defer c.traceLock.Unlock()
c.trace.send(&api.Record{
Message: c.msgControlPing,
Timestamp: timestamp,
ChannelID: ch.id,
Succeeded: err == nil,
})
}
return
}(); err != nil {
log.WithFields(logger.Fields{
"context": context,
"seq_num": req.seqNum,
"error": err,
}).Warnf("unable to send control ping")
}
c.trace(c.msgControlPing, ch.id, t, false)
}

return nil
Expand Down Expand Up @@ -188,11 +211,25 @@ func (c *Connection) msgCallback(msgID uint16, data []byte) {

// decode and trace the message
msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
if err = c.codec.DecodeMsg(data, msg); err != nil {
if err = func() (err error) {
timestamp, enabled := c.trace.registerNew()
err = c.codec.DecodeMsg(data, msg)
if enabled {
c.traceLock.Lock()
defer c.traceLock.Unlock()
c.trace.send(&api.Record{
Message: msg,
Timestamp: timestamp,
IsReceived: true,
ChannelID: chanID,
Succeeded: err == nil,
})
}
return
}(); err != nil {
log.WithField("msg", msg).Warnf("Unable to decode message: %v", err)
return
}
c.trace(msg, chanID, time.Now(), true)

if log.Level == logger.DebugLevel { // for performance reasons - logrus does some processing even if debugs are disabled
log.WithFields(logger.Fields{
Expand Down
109 changes: 81 additions & 28 deletions core/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,104 @@ import (
"go.fd.io/govpp/api"
"sort"
"sync"
"sync/atomic"
"time"
)

// trace is the API tracer object synchronizing and keeping recoded messages.
type trace struct {
list []*api.Record
mux *sync.Mutex
// default buffer size
const bufferSize = 100

isEnabled int32
// Trace is the default trace API implementation.
type Trace struct {
*sync.RWMutex
wg sync.WaitGroup

records []*api.Record
buffer chan *api.Record
index int

closeFunc func()
}

func (c *trace) Enable(enable bool) {
if enable && atomic.CompareAndSwapInt32(&c.isEnabled, 0, 1) {
log.Debugf("API trace enabled")
} else if atomic.CompareAndSwapInt32(&c.isEnabled, 1, 0) {
log.Debugf("API trace disabled")
// NewTrace initializes the trace object, always bound to a GoVPP connection.
// The size limits the number of records stored.
// Initializing a new trace for the same connection replaces the old one and
// discards all values already collected.
func NewTrace(c *Connection, size int) (t *Trace) {
t = &Trace{
RWMutex: &sync.RWMutex{},
records: make([]*api.Record, size),
buffer: make(chan *api.Record, bufferSize),
}
c.traceLock.Lock()
c.trace = t
c.traceLock.Unlock()
t.closeFunc = func() {
c.trace = nil // no more records
close(t.buffer)
}
go func() {
for {
record, ok := <-t.buffer
if !ok {
return
}
if t.index < len(t.records) {
t.Lock()
t.records[t.index] = record
t.index++
t.Unlock()
}
t.wg.Done()
}
}()
return
}

func (c *trace) GetRecords() (list []*api.Record) {
c.mux.Lock()
list = append(list, c.list...)
c.mux.Unlock()
func (t *Trace) GetRecords() (list []*api.Record) {
// it is supposed to wait until all API messages sent to the
// buffer are processed before returning the list
t.wg.Wait()
list = make([]*api.Record, t.index)
t.RLock()
copy(list, t.records[:t.index])
t.RUnlock()
sort.Slice(list, func(i, j int) bool {
return list[i].Timestamp.Before(list[j].Timestamp)
})
return list
}

func (c *trace) GetRecordsForChannel(chId uint16) (list []*api.Record) {
c.mux.Lock()
for _, entry := range c.list {
if entry.ChannelID == chId {
list = append(list, entry)
func (t *Trace) GetRecordsForChannel(chId uint16) (list []*api.Record) {
records := t.GetRecords()
for _, record := range records {
if record.ChannelID == chId {
list = append(list, record)
}
}
c.mux.Unlock()
sort.Slice(list, func(i, j int) bool {
return list[i].Timestamp.Before(list[j].Timestamp)
})
return list
}

func (c *trace) Clear() {
c.mux.Lock()
c.list = make([]*api.Record, 0)
c.mux.Unlock()
func (t *Trace) Clear() {
t.Lock()
t.records = make([]*api.Record, len(t.records))
t.index = 0
t.Unlock()
}

func (t *Trace) Close() {
t.closeFunc()
}

func (t *Trace) registerNew() (now time.Time, enabled bool) {
if t != nil {
t.wg.Add(1)
enabled = true
}
return time.Now(), enabled
}

func (t *Trace) send(record *api.Record) {
if t != nil {
t.buffer <- record
}
}
Loading

0 comments on commit 553f5ca

Please sign in to comment.