Skip to content

Commit

Permalink
WIP: aahhhh need to refactor other stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
bewing committed Apr 24, 2022
1 parent e1aef5e commit 1ae4057
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 73 deletions.
239 changes: 171 additions & 68 deletions plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/google/gnxi/utils/xpath"
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
"google.golang.org/grpc"
Expand All @@ -30,9 +31,10 @@ import (

// gNMI plugin instance
type GNMI struct {
Addresses []string `toml:"addresses"`
Subscriptions []Subscription `toml:"subscription"`
Aliases map[string]string `toml:"aliases"`
Addresses []string `toml:"addresses"`
Subscriptions []Subscription `toml:"subscription"`
TagSubscriptions []TagSubscription `toml:"tag_subscription"`
Aliases map[string]string `toml:"aliases"`

// Optional subscription configuration
Encoding string
Expand All @@ -57,19 +59,24 @@ type GNMI struct {
acc telegraf.Accumulator
cancel context.CancelFunc
wg sync.WaitGroup
// Lookup/device+name/key/value
lookup map[string]map[string]map[string]interface{}
lookupMutex sync.Mutex
legacyTags bool

Log telegraf.Logger
}

type Worker struct {
address string
tagStore map[*gnmiLib.Path]map[string]interface{}
}

// Subscription for a gNMI client
type Subscription struct {
Name string
Origin string
Path string

fullPath *gnmiLib.Path

// Subscription mode and interval
SubscriptionMode string `toml:"subscription_mode"`
SampleInterval config.Duration `toml:"sample_interval"`
Expand All @@ -82,6 +89,12 @@ type Subscription struct {
TagOnly bool `toml:"tag_only"`
}

// Tag Subscription for a gNMI client
type TagSubscription struct {
Subscription
MatchKeys []string `toml:match_keys`
}

// Start the http listener service
func (c *GNMI) Start(acc telegraf.Accumulator) error {
var err error
Expand All @@ -90,9 +103,30 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
var request *gnmiLib.SubscribeRequest
c.acc = acc
ctx, c.cancel = context.WithCancel(context.Background())
c.lookupMutex.Lock()
c.lookup = make(map[string]map[string]map[string]interface{})
c.lookupMutex.Unlock()

for i := len(c.Subscriptions) - 1; i >= 0; i-- {
subscription := c.Subscriptions[i]
// Support legacy TagOnly subscriptions
if subscription.TagOnly {
tagSub := convertTagOnlySubscription(subscription)
c.TagSubscriptions = append(c.TagSubscriptions, tagSub)
// Remove from the original subscriptions list
c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...)
c.legacyTags = true
continue
}
if err = subscription.buildFullPath(c); err != nil {
return err
}
}
for idx := range c.TagSubscriptions {
if err = c.TagSubscriptions[idx].buildFullPath(c); err != nil {
return err
}
if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly {
return fmt.Errorf("do not mix legacu tag_only subscriptions and tag subscriptions")
}
}

// Validate configuration
if request, err = c.newSubscribeRequest(); err != nil {
Expand Down Expand Up @@ -143,13 +177,6 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
c.internalAliases[longPath] = name
c.internalAliases[shortPath] = name
}

if subscription.TagOnly {
// Create the top-level lookup for this tag
c.lookupMutex.Lock()
c.lookup[name] = make(map[string]map[string]interface{})
c.lookupMutex.Unlock()
}
}
for alias, encodingPath := range c.Aliases {
c.internalAliases[encodingPath] = alias
Expand All @@ -158,10 +185,12 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
// Create a goroutine for each device, dial and subscribe
c.wg.Add(len(c.Addresses))
for _, addr := range c.Addresses {
go func(address string) {
worker := Worker{address: addr}
worker.tagStore = newTagStore(c.TagSubscriptions)
go func(worker Worker) {
defer c.wg.Done()
for ctx.Err() == nil {
if err := c.subscribeGNMI(ctx, address, tlscfg, request); err != nil && ctx.Err() == nil {
if err := c.subscribeGNMI(ctx, &worker, tlscfg, request); err != nil && ctx.Err() == nil {
acc.AddError(err)
}

Expand All @@ -170,30 +199,42 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
case <-time.After(time.Duration(c.Redial)):
}
}
}(addr)
}(worker)
}
return nil
}

func (s *Subscription) buildSubscription() (*gnmiLib.Subscription, error) {
gnmiPath, err := parsePath(s.Origin, s.Path, "")
if err != nil {
return nil, err
}
mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(s.SubscriptionMode)]
if !ok {
return nil, fmt.Errorf("invalid subscription mode %s", s.SubscriptionMode)
}
return &gnmiLib.Subscription{
Path: gnmiPath,
Mode: gnmiLib.SubscriptionMode(mode),
HeartbeatInterval: uint64(time.Duration(s.HeartbeatInterval).Nanoseconds()),
SampleInterval: uint64(time.Duration(s.SampleInterval).Nanoseconds()),
SuppressRedundant: s.SuppressRedundant,
}, nil
}

// Create a new gNMI SubscribeRequest
func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) {
// Create subscription objects
subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions))
for i, subscription := range c.Subscriptions {
gnmiPath, err := parsePath(subscription.Origin, subscription.Path, "")
if err != nil {
var err error
subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions)+len(c.TagSubscriptions))
for i, subscription := range c.TagSubscriptions {
if subscriptions[i], err = subscription.buildSubscription(); err != nil {
return nil, err
}
mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(subscription.SubscriptionMode)]
if !ok {
return nil, fmt.Errorf("invalid subscription mode %s", subscription.SubscriptionMode)
}
subscriptions[i] = &gnmiLib.Subscription{
Path: gnmiPath,
Mode: gnmiLib.SubscriptionMode(mode),
SampleInterval: uint64(time.Duration(subscription.SampleInterval).Nanoseconds()),
SuppressRedundant: subscription.SuppressRedundant,
HeartbeatInterval: uint64(time.Duration(subscription.HeartbeatInterval).Nanoseconds()),
}
for i, subscription := range c.Subscriptions {
if subscriptions[i+len(c.TagSubscriptions)], err = subscription.buildSubscription(); err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -221,15 +262,15 @@ func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) {
}

// SubscribeGNMI and extract telemetry data
func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
func (c *GNMI) subscribeGNMI(ctx context.Context, worker *Worker, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
var opt grpc.DialOption
if tlscfg != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlscfg))
} else {
opt = grpc.WithInsecure()
}

client, err := grpc.DialContext(ctx, address, opt)
client, err := grpc.DialContext(ctx, worker.address, opt)
if err != nil {
return fmt.Errorf("failed to dial: %v", err)
}
Expand All @@ -248,8 +289,8 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co
}
}

c.Log.Debugf("Connection to gNMI device %s established", address)
defer c.Log.Debugf("Connection to gNMI device %s closed", address)
c.Log.Debugf("Connection to gNMI device %s established", worker.address)
defer c.Log.Debugf("Connection to gNMI device %s closed", worker.address)
for ctx.Err() == nil {
var reply *gnmiLib.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
Expand All @@ -259,22 +300,22 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co
break
}

c.handleSubscribeResponse(address, reply)
c.handleSubscribeResponse(worker, reply)
}
return nil
}

func (c *GNMI) handleSubscribeResponse(address string, reply *gnmiLib.SubscribeResponse) {
func (c *GNMI) handleSubscribeResponse(worker *Worker, reply *gnmiLib.SubscribeResponse) {
switch response := reply.Response.(type) {
case *gnmiLib.SubscribeResponse_Update:
c.handleSubscribeResponseUpdate(address, response)
c.handleSubscribeResponseUpdate(worker, response)
case *gnmiLib.SubscribeResponse_Error:
c.Log.Errorf("Subscribe error (%d), %q", response.Error.Code, response.Error.Message)
}
}

// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.SubscribeResponse_Update) {
func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.SubscribeResponse_Update) {
var prefix, prefixAliasPath string
grouper := metric.NewSeriesGrouper()
timestamp := time.Unix(0, response.Update.Timestamp)
Expand All @@ -286,19 +327,38 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
}
}
prefixTags["source"], _, _ = net.SplitHostPort(address)
prefixTags["source"], _, _ = net.SplitHostPort(worker.address)
prefixTags["path"] = prefix

// Parse individual Update message and create measurements
var name, lastAliasPath string
for _, update := range response.Update.Update {
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)

// Prepare tags from prefix
tags := make(map[string]string, len(prefixTags))
for key, val := range prefixTags {
tags[key] = val
}
aliasPath, fields := c.handleTelemetryField(update, tags, prefix)

// Intercept and store tag-only subscriptions
for _, tagSub := range c.TagSubscriptions {
if equalPathNoKeys(fullPath, tagSub.fullPath) {
worker.tagStore[fullPath] = fields
return
}
}

if c.legacyTags {
if tagOnlyTags := worker.checkLegacyTags(fullPath); tagOnlyTags != nil {
for k, v := range tagOnlyTags {
tags[k] = fmt.Sprint(v)
spew.Dump(aliasPath)
}
}
}

// Inherent valid alias from prefix parsing
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
aliasPath = prefixAliasPath
Expand All @@ -314,32 +374,6 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
}
}

// Update tag lookups and discard rest of update
subscriptionKey := tags["source"] + "/" + tags["name"]
c.lookupMutex.Lock()
if _, ok := c.lookup[name]; ok {
// We are subscribed to this, so add the fields to the lookup-table
if _, ok := c.lookup[name][subscriptionKey]; !ok {
c.lookup[name][subscriptionKey] = make(map[string]interface{})
}
for k, v := range fields {
c.lookup[name][subscriptionKey][path.Base(k)] = v
}
c.lookupMutex.Unlock()
// Do not process the data further as we only subscribed here for the lookup table
continue
}

// Apply lookups if present
for subscriptionName, values := range c.lookup {
if annotations, ok := values[subscriptionKey]; ok {
for k, v := range annotations {
tags[subscriptionName+"/"+k] = fmt.Sprint(v)
}
}
}
c.lookupMutex.Unlock()

// Group metrics
for k, v := range fields {
key := k
Expand Down Expand Up @@ -511,3 +545,72 @@ func init() {
// Backwards compatible alias:
inputs.Add("cisco_telemetry_gnmi", New)
}

func convertTagOnlySubscription(s Subscription) TagSubscription {
t := TagSubscription{Subscription: s}
return t
}

func newTagStore(ts []TagSubscription) map[*gnmiLib.Path]map[string]interface{} {
t := make(map[*gnmiLib.Path]map[string]interface{})
return t
}

// equalPathNoKeys checks if two gNMI paths are equal, without keys
func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool {
if len(a.Elem) != len(b.Elem) {
return false
}
for i := range a.Elem {
if a.Elem[i].Name != b.Elem[i].Name {
return false
}
}
return true
}

func pathWithPrefix(prefix *gnmiLib.Path, path *gnmiLib.Path) *gnmiLib.Path {
fullPath := new(gnmiLib.Path)
fullPath.Origin = prefix.Origin
fullPath.Target = prefix.Target
fullPath.Elem = append(prefix.Elem, path.Elem...)
return fullPath
}

func (s *Subscription) buildFullPath(c *GNMI) error {
var err error
if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil {
return err
}
s.fullPath.Origin = s.Origin
s.fullPath.Target = c.Target
if c.Prefix != "" {
prefix, err := xpath.ToGNMIPath(c.Prefix)
if err != nil {
return err
}
s.fullPath.Elem = append(prefix.Elem, s.fullPath.Elem...)
if s.Origin == "" && c.Origin != "" {
s.fullPath.Origin = c.Origin
}
}
return nil
}

func (w *Worker) checkLegacyTags(fullPath *gnmiLib.Path) map[string]interface{} {
for idx := range fullPath.Elem {
// Match the first "name" key in the path
if v, ok := fullPath.Elem[idx].Key["name"]; ok {
// Look for the same "name" key in the store
for path := range w.tagStore {
for _, elem := range path.Elem {
if match := elem.Key["name"]; match == v {
return w.tagStore[path]
}

}
}
}
}
return nil
}
Loading

0 comments on commit 1ae4057

Please sign in to comment.