diff --git a/plugins/inputs/gnmi/gnmi.go b/plugins/inputs/gnmi/gnmi.go index 42754348e8c4b..690ae938f1533 100644 --- a/plugins/inputs/gnmi/gnmi.go +++ b/plugins/inputs/gnmi/gnmi.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/davecgh/go-spew/spew" "github.com/google/gnxi/utils/xpath" gnmiLib "github.com/openconfig/gnmi/proto/gnmi" "google.golang.org/grpc" @@ -30,9 +31,10 @@ import ( // gNMI plugin instance type GNMI struct { - Addresses []string `toml:"addresses"` - Subscriptions []Subscription `toml:"subscription"` - Aliases map[string]string `toml:"aliases"` + Addresses []string `toml:"addresses"` + Subscriptions []Subscription `toml:"subscription"` + TagSubscriptions []TagSubscription `toml:"tag_subscription"` + Aliases map[string]string `toml:"aliases"` // Optional subscription configuration Encoding string @@ -57,19 +59,24 @@ type GNMI struct { acc telegraf.Accumulator cancel context.CancelFunc wg sync.WaitGroup - // Lookup/device+name/key/value - lookup map[string]map[string]map[string]interface{} - lookupMutex sync.Mutex + legacyTags bool Log telegraf.Logger } +type Worker struct { + address string + tagStore map[*gnmiLib.Path]map[string]interface{} +} + // Subscription for a gNMI client type Subscription struct { Name string Origin string Path string + fullPath *gnmiLib.Path + // Subscription mode and interval SubscriptionMode string `toml:"subscription_mode"` SampleInterval config.Duration `toml:"sample_interval"` @@ -82,6 +89,12 @@ type Subscription struct { TagOnly bool `toml:"tag_only"` } +// Tag Subscription for a gNMI client +type TagSubscription struct { + Subscription + MatchKeys []string `toml:match_keys` +} + // Start the http listener service func (c *GNMI) Start(acc telegraf.Accumulator) error { var err error @@ -90,9 +103,30 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { var request *gnmiLib.SubscribeRequest c.acc = acc ctx, c.cancel = context.WithCancel(context.Background()) - c.lookupMutex.Lock() - c.lookup = make(map[string]map[string]map[string]interface{}) - c.lookupMutex.Unlock() + + for i := len(c.Subscriptions) - 1; i >= 0; i-- { + subscription := c.Subscriptions[i] + // Support legacy TagOnly subscriptions + if subscription.TagOnly { + tagSub := convertTagOnlySubscription(subscription) + c.TagSubscriptions = append(c.TagSubscriptions, tagSub) + // Remove from the original subscriptions list + c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...) + c.legacyTags = true + continue + } + if err = subscription.buildFullPath(c); err != nil { + return err + } + } + for idx := range c.TagSubscriptions { + if err = c.TagSubscriptions[idx].buildFullPath(c); err != nil { + return err + } + if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly { + return fmt.Errorf("do not mix legacu tag_only subscriptions and tag subscriptions") + } + } // Validate configuration if request, err = c.newSubscribeRequest(); err != nil { @@ -143,13 +177,6 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { c.internalAliases[longPath] = name c.internalAliases[shortPath] = name } - - if subscription.TagOnly { - // Create the top-level lookup for this tag - c.lookupMutex.Lock() - c.lookup[name] = make(map[string]map[string]interface{}) - c.lookupMutex.Unlock() - } } for alias, encodingPath := range c.Aliases { c.internalAliases[encodingPath] = alias @@ -158,10 +185,12 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { // Create a goroutine for each device, dial and subscribe c.wg.Add(len(c.Addresses)) for _, addr := range c.Addresses { - go func(address string) { + worker := Worker{address: addr} + worker.tagStore = newTagStore(c.TagSubscriptions) + go func(worker Worker) { defer c.wg.Done() for ctx.Err() == nil { - if err := c.subscribeGNMI(ctx, address, tlscfg, request); err != nil && ctx.Err() == nil { + if err := c.subscribeGNMI(ctx, &worker, tlscfg, request); err != nil && ctx.Err() == nil { acc.AddError(err) } @@ -170,30 +199,42 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { case <-time.After(time.Duration(c.Redial)): } } - }(addr) + }(worker) } return nil } +func (s *Subscription) buildSubscription() (*gnmiLib.Subscription, error) { + gnmiPath, err := parsePath(s.Origin, s.Path, "") + if err != nil { + return nil, err + } + mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(s.SubscriptionMode)] + if !ok { + return nil, fmt.Errorf("invalid subscription mode %s", s.SubscriptionMode) + } + return &gnmiLib.Subscription{ + Path: gnmiPath, + Mode: gnmiLib.SubscriptionMode(mode), + HeartbeatInterval: uint64(time.Duration(s.HeartbeatInterval).Nanoseconds()), + SampleInterval: uint64(time.Duration(s.SampleInterval).Nanoseconds()), + SuppressRedundant: s.SuppressRedundant, + }, nil +} + // Create a new gNMI SubscribeRequest func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) { // Create subscription objects - subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions)) - for i, subscription := range c.Subscriptions { - gnmiPath, err := parsePath(subscription.Origin, subscription.Path, "") - if err != nil { + var err error + subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions)+len(c.TagSubscriptions)) + for i, subscription := range c.TagSubscriptions { + if subscriptions[i], err = subscription.buildSubscription(); err != nil { return nil, err } - mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(subscription.SubscriptionMode)] - if !ok { - return nil, fmt.Errorf("invalid subscription mode %s", subscription.SubscriptionMode) - } - subscriptions[i] = &gnmiLib.Subscription{ - Path: gnmiPath, - Mode: gnmiLib.SubscriptionMode(mode), - SampleInterval: uint64(time.Duration(subscription.SampleInterval).Nanoseconds()), - SuppressRedundant: subscription.SuppressRedundant, - HeartbeatInterval: uint64(time.Duration(subscription.HeartbeatInterval).Nanoseconds()), + } + for i, subscription := range c.Subscriptions { + if subscriptions[i+len(c.TagSubscriptions)], err = subscription.buildSubscription(); err != nil { + return nil, err } } @@ -221,7 +262,7 @@ func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) { } // SubscribeGNMI and extract telemetry data -func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { +func (c *GNMI) subscribeGNMI(ctx context.Context, worker *Worker, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { var opt grpc.DialOption if tlscfg != nil { opt = grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)) @@ -229,7 +270,7 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co opt = grpc.WithInsecure() } - client, err := grpc.DialContext(ctx, address, opt) + client, err := grpc.DialContext(ctx, worker.address, opt) if err != nil { return fmt.Errorf("failed to dial: %v", err) } @@ -248,8 +289,8 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co } } - c.Log.Debugf("Connection to gNMI device %s established", address) - defer c.Log.Debugf("Connection to gNMI device %s closed", address) + c.Log.Debugf("Connection to gNMI device %s established", worker.address) + defer c.Log.Debugf("Connection to gNMI device %s closed", worker.address) for ctx.Err() == nil { var reply *gnmiLib.SubscribeResponse if reply, err = subscribeClient.Recv(); err != nil { @@ -259,22 +300,22 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co break } - c.handleSubscribeResponse(address, reply) + c.handleSubscribeResponse(worker, reply) } return nil } -func (c *GNMI) handleSubscribeResponse(address string, reply *gnmiLib.SubscribeResponse) { +func (c *GNMI) handleSubscribeResponse(worker *Worker, reply *gnmiLib.SubscribeResponse) { switch response := reply.Response.(type) { case *gnmiLib.SubscribeResponse_Update: - c.handleSubscribeResponseUpdate(address, response) + c.handleSubscribeResponseUpdate(worker, response) case *gnmiLib.SubscribeResponse_Error: c.Log.Errorf("Subscribe error (%d), %q", response.Error.Code, response.Error.Message) } } // Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data -func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.SubscribeResponse_Update) { +func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.SubscribeResponse_Update) { var prefix, prefixAliasPath string grouper := metric.NewSeriesGrouper() timestamp := time.Unix(0, response.Update.Timestamp) @@ -286,12 +327,14 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err) } } - prefixTags["source"], _, _ = net.SplitHostPort(address) + prefixTags["source"], _, _ = net.SplitHostPort(worker.address) prefixTags["path"] = prefix // Parse individual Update message and create measurements var name, lastAliasPath string for _, update := range response.Update.Update { + fullPath := pathWithPrefix(response.Update.Prefix, update.Path) + // Prepare tags from prefix tags := make(map[string]string, len(prefixTags)) for key, val := range prefixTags { @@ -299,6 +342,23 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S } aliasPath, fields := c.handleTelemetryField(update, tags, prefix) + // Intercept and store tag-only subscriptions + for _, tagSub := range c.TagSubscriptions { + if equalPathNoKeys(fullPath, tagSub.fullPath) { + worker.tagStore[fullPath] = fields + return + } + } + + if c.legacyTags { + if tagOnlyTags := worker.checkLegacyTags(fullPath); tagOnlyTags != nil { + for k, v := range tagOnlyTags { + tags[k] = fmt.Sprint(v) + spew.Dump(aliasPath) + } + } + } + // Inherent valid alias from prefix parsing if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { aliasPath = prefixAliasPath @@ -314,32 +374,6 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S } } - // Update tag lookups and discard rest of update - subscriptionKey := tags["source"] + "/" + tags["name"] - c.lookupMutex.Lock() - if _, ok := c.lookup[name]; ok { - // We are subscribed to this, so add the fields to the lookup-table - if _, ok := c.lookup[name][subscriptionKey]; !ok { - c.lookup[name][subscriptionKey] = make(map[string]interface{}) - } - for k, v := range fields { - c.lookup[name][subscriptionKey][path.Base(k)] = v - } - c.lookupMutex.Unlock() - // Do not process the data further as we only subscribed here for the lookup table - continue - } - - // Apply lookups if present - for subscriptionName, values := range c.lookup { - if annotations, ok := values[subscriptionKey]; ok { - for k, v := range annotations { - tags[subscriptionName+"/"+k] = fmt.Sprint(v) - } - } - } - c.lookupMutex.Unlock() - // Group metrics for k, v := range fields { key := k @@ -511,3 +545,72 @@ func init() { // Backwards compatible alias: inputs.Add("cisco_telemetry_gnmi", New) } + +func convertTagOnlySubscription(s Subscription) TagSubscription { + t := TagSubscription{Subscription: s} + return t +} + +func newTagStore(ts []TagSubscription) map[*gnmiLib.Path]map[string]interface{} { + t := make(map[*gnmiLib.Path]map[string]interface{}) + return t +} + +// equalPathNoKeys checks if two gNMI paths are equal, without keys +func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool { + if len(a.Elem) != len(b.Elem) { + return false + } + for i := range a.Elem { + if a.Elem[i].Name != b.Elem[i].Name { + return false + } + } + return true +} + +func pathWithPrefix(prefix *gnmiLib.Path, path *gnmiLib.Path) *gnmiLib.Path { + fullPath := new(gnmiLib.Path) + fullPath.Origin = prefix.Origin + fullPath.Target = prefix.Target + fullPath.Elem = append(prefix.Elem, path.Elem...) + return fullPath +} + +func (s *Subscription) buildFullPath(c *GNMI) error { + var err error + if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil { + return err + } + s.fullPath.Origin = s.Origin + s.fullPath.Target = c.Target + if c.Prefix != "" { + prefix, err := xpath.ToGNMIPath(c.Prefix) + if err != nil { + return err + } + s.fullPath.Elem = append(prefix.Elem, s.fullPath.Elem...) + if s.Origin == "" && c.Origin != "" { + s.fullPath.Origin = c.Origin + } + } + return nil +} + +func (w *Worker) checkLegacyTags(fullPath *gnmiLib.Path) map[string]interface{} { + for idx := range fullPath.Elem { + // Match the first "name" key in the path + if v, ok := fullPath.Elem[idx].Key["name"]; ok { + // Look for the same "name" key in the store + for path := range w.tagStore { + for _, elem := range path.Elem { + if match := elem.Key["name"]; match == v { + return w.tagStore[path] + } + + } + } + } + } + return nil +} diff --git a/plugins/inputs/gnmi/gnmi_test.go b/plugins/inputs/gnmi/gnmi_test.go index e2078a676f8ef..da3c776f4e80e 100644 --- a/plugins/inputs/gnmi/gnmi_test.go +++ b/plugins/inputs/gnmi/gnmi_test.go @@ -478,10 +478,10 @@ func TestNotification(t *testing.T) { testutil.MustMetric( "oc-intf-counters", map[string]string{ - "path": "", - "source": "127.0.0.1", - "name": "Ethernet1", - "oc-intf-desc/description": "foo", + "path": "", + "source": "127.0.0.1", + "name": "Ethernet1", + "/interfaces/interface/state/description": "foo", }, map[string]interface{}{ "in_broadcast_pkts": 42, @@ -544,7 +544,7 @@ func TestSubscribeResponseError(t *testing.T) { plugin := &GNMI{Log: ml} // TODO: FIX SA1019: gnmi.Error is deprecated: Do not use. errorResponse := &gnmiLib.SubscribeResponse_Error{Error: &gnmiLib.Error{Message: me, Code: mc}} - plugin.handleSubscribeResponse("127.0.0.1:0", &gnmiLib.SubscribeResponse{Response: errorResponse}) + plugin.handleSubscribeResponse(&Worker{address: "127.0.0.1:0"}, &gnmiLib.SubscribeResponse{Response: errorResponse}) require.NotEmpty(t, ml.lastFormat) require.Equal(t, []interface{}{mc, me}, ml.lastArgs) }