From c3cc6fc36ae1efcf42edec4fc859f50649559cc9 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Tue, 17 Oct 2023 12:59:58 -0400 Subject: [PATCH 01/13] added a parser to parse data and send as metrics WAN-2496 #time 30m --- plugins/inputs/t128_tank/README.md | 4 +- plugins/inputs/t128_tank/t128_tank.go | 63 +++++++++++---------------- 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/plugins/inputs/t128_tank/README.md b/plugins/inputs/t128_tank/README.md index 2e6d1c9873a7e..c7f60f4824a6a 100644 --- a/plugins/inputs/t128_tank/README.md +++ b/plugins/inputs/t128_tank/README.md @@ -6,7 +6,6 @@ The tank input plugin collects data from a 128T. ```toml [[inputs.t128_tank]] -## A (unique) file to use for index tracking. ## This tracking allows each event to be produced once. # index_file = "" @@ -16,6 +15,9 @@ The tank input plugin collects data from a 128T. ## Port Number to get tank data from. # port_number = 11011 +## A field name to display index number +# sequence_number_field = "" + ## Server Address to get tank data from. # server_address = "127.0.0.1" ``` diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index 7670a538a1970..ebe502ac366af 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -3,14 +3,12 @@ package t128_tank import ( "context" "fmt" - "regexp" "strconv" - "strings" "sync" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" ) const ( @@ -30,24 +28,24 @@ var sampleConfig = ` ## Port Number to get tank data from. # port_number = 11011 +## A field name to display index number +# sequence_number_field = "" + ## Server Address to get tank data from. # server_address = "127.0.0.1" ` -var ( - typePattern = `,type=([^\s]+)` - recordTypePattern = `recordType=([^\s]+)` -) - type T128Tank struct { - IndexFile string `toml:"index_file"` - Topic string `toml:"topic"` - PortNumber int `toml:"port_number"` - ServerAddress string `toml:"server_address"` - Log telegraf.Logger - ctx context.Context - mainWG sync.WaitGroup - cancel context.CancelFunc + IndexFile string `toml:"index_file"` + Topic string `toml:"topic"` + PortNumber int `toml:"port_number"` + ServerAddress string `toml:"server_address"` + SequenceNumberField string `toml:"sequence_number_field"` + Log telegraf.Logger + ctx context.Context + mainWG sync.WaitGroup + cancel context.CancelFunc + parser parsers.Parser } func (*T128Tank) SampleConfig() string { @@ -58,6 +56,10 @@ func (*T128Tank) Description() string { return "Run TANK as a long-running input plugin" } +func (plugin *T128Tank) SetParser(parser parsers.Parser) { + plugin.parser = parser +} + func (plugin *T128Tank) Init() error { err := plugin.checkConfig() if err != nil { @@ -83,20 +85,16 @@ func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { return case messages := <-reader.sendChan: for _, message := range messages { - - tags := map[string]string{ - "index": strconv.FormatUint(message.Index.value, 10), + metrics, err := plugin.parser.Parse(message.Message) + if err != nil { + acc.AddError(err) } - if strings.ToLower(plugin.Topic) == "events" { - messageType := extractTopicType(string(message.Message), typePattern) - tags["type"] = messageType - } else if strings.ToLower(plugin.Topic) == "session_records" { - messageType := extractTopicType(string(message.Message), recordTypePattern) - tags["recordType"] = messageType + for _, metric := range metrics { + if plugin.SequenceNumberField != "" { + metric.AddField(plugin.SequenceNumberField, strconv.FormatUint(message.Index.value, 10)) + } + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } - acc.AddFields("t128_tank", map[string]interface{}{ - "message": string(message.Message), - }, tags, time.Now()) } } } @@ -118,15 +116,6 @@ func (plugin *T128Tank) Stop() { plugin.mainWG.Wait() } -func extractTopicType(message string, pattern string) string { - regex := regexp.MustCompile(pattern) - match := regex.FindStringSubmatch(message) - if len(match) > 1 { - return match[1] - } - return "" -} - func (plugin *T128Tank) checkConfig() error { if plugin.Topic == "" { return fmt.Errorf("topic is a required configuration field") From 8887ea12a5677f213f0333e210779cd5e15fcc74 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Wed, 18 Oct 2023 12:02:19 -0400 Subject: [PATCH 02/13] added a new config option and updated the logic WAN-2496 #time 30m --- plugins/inputs/t128_tank/README.md | 6 ++++ plugins/inputs/t128_tank/t128_tank.go | 42 ++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/t128_tank/README.md b/plugins/inputs/t128_tank/README.md index c7f60f4824a6a..e9fcfd5270e9b 100644 --- a/plugins/inputs/t128_tank/README.md +++ b/plugins/inputs/t128_tank/README.md @@ -20,4 +20,10 @@ The tank input plugin collects data from a 128T. ## Server Address to get tank data from. # server_address = "127.0.0.1" + +## From specifies the first message we are interested in. +## If from is "beginning" or "start", it will start consuming from the +## first available message in the selected topic. If it is "eof" or "end", +## it will tail the topic for newly produced messages. +# from = "end" ``` diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index ebe502ac366af..9270d694cded6 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "sync" "github.com/influxdata/telegraf" @@ -33,6 +34,12 @@ var sampleConfig = ` ## Server Address to get tank data from. # server_address = "127.0.0.1" + +## From specifies the first message we are interested in. +## If from is "beginning" or "start", it will start consuming from the +## first available message in the selected topic. If it is "eof" or "end", +## it will tail the topic for newly produced messages. +# from = "end" ` type T128Tank struct { @@ -41,11 +48,14 @@ type T128Tank struct { PortNumber int `toml:"port_number"` ServerAddress string `toml:"server_address"` SequenceNumberField string `toml:"sequence_number_field"` - Log telegraf.Logger - ctx context.Context - mainWG sync.WaitGroup - cancel context.CancelFunc - parser parsers.Parser + From string `toml:"from"` + + Log telegraf.Logger + ctx context.Context + mainWG sync.WaitGroup + cancel context.CancelFunc + parser parsers.Parser + indexValue index } func (*T128Tank) SampleConfig() string { @@ -75,7 +85,7 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error { func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { plugin.ctx, plugin.cancel = context.WithCancel(context.Background()) - reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, StartIndex, plugin.Log, acc) + reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.indexValue, plugin.Log, acc) plugin.mainWG.Add(1) go func() { defer plugin.mainWG.Done() @@ -93,7 +103,7 @@ func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { if plugin.SequenceNumberField != "" { metric.AddField(plugin.SequenceNumberField, strconv.FormatUint(message.Index.value, 10)) } - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + acc.AddMetric(metric) } } } @@ -121,6 +131,24 @@ func (plugin *T128Tank) checkConfig() error { return fmt.Errorf("topic is a required configuration field") } + if plugin.IndexFile == "" { + if strings.ToLower(plugin.From) == "end" || strings.ToLower(plugin.From) == "eof" { + plugin.indexValue = EndIndex + } else if strings.ToLower(plugin.From) == "beginning" || strings.ToLower(plugin.From) == "start" { + plugin.indexValue = StartIndex + } else { + plugin.indexValue = EndIndex + } + } else { + if strings.ToLower(plugin.From) == "end" || strings.ToLower(plugin.From) == "eof" { + plugin.indexValue = EndIndex + } else if strings.ToLower(plugin.From) == "beginning" || strings.ToLower(plugin.From) == "start" { + plugin.indexValue = StartIndex + } else { + plugin.indexValue = StartIndex + } + } + return nil } From 5245bfde8ebb1dffecebc91fabf5a9b50308150f Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Thu, 19 Oct 2023 09:00:24 -0400 Subject: [PATCH 03/13] addressed more comments and simplified logic WAN-2496 #time 20m --- plugins/inputs/t128_tank/README.md | 7 +-- plugins/inputs/t128_tank/reader.go | 25 ++++++---- plugins/inputs/t128_tank/t128_tank.go | 69 ++++++++++++++++----------- 3 files changed, 59 insertions(+), 42 deletions(-) diff --git a/plugins/inputs/t128_tank/README.md b/plugins/inputs/t128_tank/README.md index e9fcfd5270e9b..643bb23770db3 100644 --- a/plugins/inputs/t128_tank/README.md +++ b/plugins/inputs/t128_tank/README.md @@ -6,6 +6,7 @@ The tank input plugin collects data from a 128T. ```toml [[inputs.t128_tank]] +## A (unique) file to use for index tracking. ## This tracking allows each event to be produced once. # index_file = "" @@ -22,8 +23,8 @@ The tank input plugin collects data from a 128T. # server_address = "127.0.0.1" ## From specifies the first message we are interested in. -## If from is "beginning" or "start", it will start consuming from the -## first available message in the selected topic. If it is "eof" or "end", -## it will tail the topic for newly produced messages. +## If from is "start", it will start consuming from the +## first available message in the selected topic. +## If it is "end", it will tail the topic for newly produced messages. # from = "end" ``` diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 9ecc5acb9723d..9a4804c936e13 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -171,23 +171,28 @@ func (r *Reader) getIndex(indexPath string, defaultIndex index) (index, error) { r.log.Debugf("index file path not provided, starting with default index %s", defaultIndex.string()) return defaultIndex, nil } - content, err := os.ReadFile(indexPath) - if errors.Is(err, os.ErrNotExist) { + content, err := os.Open(indexPath) + if os.IsNotExist(err) { r.log.Debugf("index file %s does not exist, starting with default index %s", indexPath, defaultIndex.string()) return defaultIndex, nil } else if err != nil { return defaultIndex, fmt.Errorf("encountered error reading index file, starting with default index %s: %s", defaultIndex.string(), err) } - - r.log.Debugf("found '%s' in index file", content) - - index, err := newIndex(string(content)) - if err != nil { - return defaultIndex, fmt.Errorf("encountered error while parsing index file content, starting with default index %s: %s", defaultIndex.string(), err) + defer content.Close() + scanner := bufio.NewScanner(content) + if scanner.Scan() { + data := scanner.Text() + data = strings.TrimSpace(data) + r.log.Debugf("found '%s' in index file", data) + index, err := newIndex(data) + if err != nil { + return defaultIndex, fmt.Errorf("encountered error while parsing index file content, starting with default index %s: %s", defaultIndex.string(), err) + } + return index, nil + } else { + return defaultIndex, fmt.Errorf("encountered error while reading index file content, starting with default index %s", defaultIndex.string()) } - - return index, nil } func (r *Reader) setIndex(indexPath string, index index) { diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index 9270d694cded6..176f623e11749 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -2,6 +2,7 @@ package t128_tank import ( "context" + "errors" "fmt" "strconv" "strings" @@ -36,26 +37,25 @@ var sampleConfig = ` # server_address = "127.0.0.1" ## From specifies the first message we are interested in. -## If from is "beginning" or "start", it will start consuming from the -## first available message in the selected topic. If it is "eof" or "end", -## it will tail the topic for newly produced messages. +## If from is "start", it will start consuming from the +## first available message in the selected topic. +## If it is "end", it will tail the topic for newly produced messages. # from = "end" ` type T128Tank struct { - IndexFile string `toml:"index_file"` - Topic string `toml:"topic"` - PortNumber int `toml:"port_number"` - ServerAddress string `toml:"server_address"` - SequenceNumberField string `toml:"sequence_number_field"` - From string `toml:"from"` - - Log telegraf.Logger - ctx context.Context - mainWG sync.WaitGroup - cancel context.CancelFunc - parser parsers.Parser - indexValue index + IndexFile string `toml:"index_file"` + Topic string `toml:"topic"` + PortNumber int `toml:"port_number"` + ServerAddress string `toml:"server_address"` + SequenceNumberField string `toml:"sequence_number_field"` + From string `toml:"from"` + Log telegraf.Logger + ctx context.Context + mainWG sync.WaitGroup + cancel context.CancelFunc + parser parsers.Parser + defaultStartingIndex index } func (*T128Tank) SampleConfig() string { @@ -85,7 +85,7 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error { func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { plugin.ctx, plugin.cancel = context.WithCancel(context.Background()) - reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.indexValue, plugin.Log, acc) + reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.defaultStartingIndex, plugin.Log, acc) plugin.mainWG.Add(1) go func() { defer plugin.mainWG.Done() @@ -131,24 +131,35 @@ func (plugin *T128Tank) checkConfig() error { return fmt.Errorf("topic is a required configuration field") } - if plugin.IndexFile == "" { - if strings.ToLower(plugin.From) == "end" || strings.ToLower(plugin.From) == "eof" { - plugin.indexValue = EndIndex - } else if strings.ToLower(plugin.From) == "beginning" || strings.ToLower(plugin.From) == "start" { - plugin.indexValue = StartIndex - } else { - plugin.indexValue = EndIndex + if plugin.From != "" { + err := validateFrom(plugin.From) + if err != nil { + return fmt.Errorf("%s", err) } + } + + if strings.ToLower(plugin.From) == "end" { + plugin.defaultStartingIndex = EndIndex + } else if strings.ToLower(plugin.From) == "start" { + plugin.defaultStartingIndex = StartIndex } else { - if strings.ToLower(plugin.From) == "end" || strings.ToLower(plugin.From) == "eof" { - plugin.indexValue = EndIndex - } else if strings.ToLower(plugin.From) == "beginning" || strings.ToLower(plugin.From) == "start" { - plugin.indexValue = StartIndex + if plugin.IndexFile == "" { + plugin.defaultStartingIndex = EndIndex } else { - plugin.indexValue = StartIndex + plugin.defaultStartingIndex = StartIndex } } + return nil +} +func validateFrom(from string) error { + validFromValues := map[string]bool{ + "start": true, + "end": true, + } + if _, ok := validFromValues[strings.ToLower(from)]; !ok { + return errors.New("Invalid from value. Accepted values are 'start' or 'end'.") + } return nil } From a41c3cb5c57d79b6e6dd9c54f7d1b974729bd716 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Thu, 19 Oct 2023 16:26:16 -0400 Subject: [PATCH 04/13] addressed comments WAN-2496 #time 5m --- plugins/inputs/t128_tank/reader.go | 24 +++++++++--------------- plugins/inputs/t128_tank/t128_tank.go | 9 +++------ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 9a4804c936e13..19a1de8ee293b 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -171,28 +171,22 @@ func (r *Reader) getIndex(indexPath string, defaultIndex index) (index, error) { r.log.Debugf("index file path not provided, starting with default index %s", defaultIndex.string()) return defaultIndex, nil } - content, err := os.Open(indexPath) - if os.IsNotExist(err) { + content, err := os.ReadFile(indexPath) + if errors.Is(err, os.ErrNotExist) { r.log.Debugf("index file %s does not exist, starting with default index %s", indexPath, defaultIndex.string()) return defaultIndex, nil } else if err != nil { return defaultIndex, fmt.Errorf("encountered error reading index file, starting with default index %s: %s", defaultIndex.string(), err) } - defer content.Close() - scanner := bufio.NewScanner(content) - if scanner.Scan() { - data := scanner.Text() - data = strings.TrimSpace(data) - r.log.Debugf("found '%s' in index file", data) - index, err := newIndex(data) - if err != nil { - return defaultIndex, fmt.Errorf("encountered error while parsing index file content, starting with default index %s: %s", defaultIndex.string(), err) - } - return index, nil - } else { - return defaultIndex, fmt.Errorf("encountered error while reading index file content, starting with default index %s", defaultIndex.string()) + newContent := strings.Split(string(content), "\n")[0] + r.log.Debugf("found '%s' in index file", newContent) + index, err := newIndex(newContent) + if err != nil { + return defaultIndex, fmt.Errorf("encountered error while parsing index file content, starting with default index %s: %s", defaultIndex.string(), err) } + + return index, nil } func (r *Reader) setIndex(indexPath string, index index) { diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index 176f623e11749..a0d43a8f9adfd 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -134,7 +134,7 @@ func (plugin *T128Tank) checkConfig() error { if plugin.From != "" { err := validateFrom(plugin.From) if err != nil { - return fmt.Errorf("%s", err) + return err } } @@ -153,11 +153,8 @@ func (plugin *T128Tank) checkConfig() error { } func validateFrom(from string) error { - validFromValues := map[string]bool{ - "start": true, - "end": true, - } - if _, ok := validFromValues[strings.ToLower(from)]; !ok { + fromLower := strings.ToLower(from) + if fromLower != "start" && fromLower != "end" { return errors.New("Invalid from value. Accepted values are 'start' or 'end'.") } return nil From cbc29621f7d06b167b9477a383b2cfc4909a358f Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Thu, 19 Oct 2023 22:12:11 -0400 Subject: [PATCH 05/13] updated the index-file config WAN-2496 #time 5m --- plugins/inputs/t128_tank/README.md | 2 +- plugins/inputs/t128_tank/t128_tank.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/t128_tank/README.md b/plugins/inputs/t128_tank/README.md index 643bb23770db3..3a0e8f5e9b610 100644 --- a/plugins/inputs/t128_tank/README.md +++ b/plugins/inputs/t128_tank/README.md @@ -8,7 +8,7 @@ The tank input plugin collects data from a 128T. [[inputs.t128_tank]] ## A (unique) file to use for index tracking. ## This tracking allows each event to be produced once. -# index_file = "" +# index-file = "" ## Required. The TANK topic to consume. # topic = "events" diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index a0d43a8f9adfd..b10207dcc3b48 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -22,7 +22,7 @@ var sampleConfig = ` [[inputs.t128_tank]] ## A (unique) file to use for index tracking. ## This tracking allows each event to be produced once. -# index_file = "" +# index-file = "" ## Required. The TANK topic to consume. # topic = "events" @@ -44,7 +44,7 @@ var sampleConfig = ` ` type T128Tank struct { - IndexFile string `toml:"index_file"` + IndexFile string `toml:"index-file"` Topic string `toml:"topic"` PortNumber int `toml:"port_number"` ServerAddress string `toml:"server_address"` From 1b4cff8b93c094ba24054310fa6131542c005aff Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Fri, 20 Oct 2023 20:11:32 -0400 Subject: [PATCH 06/13] fixed an issue with last index not being saved WAN-2496 #time 15m --- plugins/inputs/t128_tank/reader.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 19a1de8ee293b..99103127881b4 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -105,7 +105,7 @@ type Reader struct { } func NewReader(tankAddress string, tankPort int, topic string, indexPath string, defaultIndex index, log telegraf.Logger, acc telegraf.Accumulator) *Reader { - return &Reader{ + r := &Reader{ topic: topic, tankReadCmdCtx: exec.CommandContext, sendChan: make(chan []IndexedMessage), @@ -118,6 +118,13 @@ func NewReader(tankAddress string, tankPort int, topic string, indexPath string, log: log, acc: acc, } + lastSavedIndex, err := r.getIndex(indexPath, defaultIndex) + if err != nil { + r.log.Errorf("Error loading last saved index: %v", err) + lastSavedIndex = r.defaultIndex + } + r.lastSavedIndex = lastSavedIndex.value + return r } func (r *Reader) withTankReadCommandContext(tankReadCmdCtx CommandContext) *Reader { @@ -270,6 +277,10 @@ func (r *Reader) readFromTank(readCtx context.Context, startingIndex index) (err for _, message := range messages { collectedMessages = append(collectedMessages, *message) } + if len(collectedMessages) > 0 { + r.lastSavedIndex = collectedMessages[len(collectedMessages)-1].Index.value + r.setIndex(r.indexPath, index{value: r.lastSavedIndex}) + } select { case <-readCtx.Done(): return nil From 89f49422d52c47855f0a8d55a4c5a1600c061ff7 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Tue, 24 Oct 2023 07:52:21 -0400 Subject: [PATCH 07/13] addressed pr comments WAN-2496 #time 30m --- plugins/inputs/t128_tank/reader.go | 49 ++++++++++++++---------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 99103127881b4..01bf86790c222 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -105,7 +105,7 @@ type Reader struct { } func NewReader(tankAddress string, tankPort int, topic string, indexPath string, defaultIndex index, log telegraf.Logger, acc telegraf.Accumulator) *Reader { - r := &Reader{ + return &Reader{ topic: topic, tankReadCmdCtx: exec.CommandContext, sendChan: make(chan []IndexedMessage), @@ -118,13 +118,6 @@ func NewReader(tankAddress string, tankPort int, topic string, indexPath string, log: log, acc: acc, } - lastSavedIndex, err := r.getIndex(indexPath, defaultIndex) - if err != nil { - r.log.Errorf("Error loading last saved index: %v", err) - lastSavedIndex = r.defaultIndex - } - r.lastSavedIndex = lastSavedIndex.value - return r } func (r *Reader) withTankReadCommandContext(tankReadCmdCtx CommandContext) *Reader { @@ -132,42 +125,47 @@ func (r *Reader) withTankReadCommandContext(tankReadCmdCtx CommandContext) *Read return r } +func (r *Reader) saveIndex(indexValue uint64) { + r.lastSavedIndex = indexValue + r.setIndex(r.indexPath, index{value: indexValue}) +} + func (r *Reader) Run(mainCtx context.Context) { readCtx, readCtxCancel := context.WithCancel(mainCtx) lastIndex, err := r.getIndex(r.indexPath, r.defaultIndex) - r.lastSavedIndex = lastIndex.value if err != nil { r.log.Errorf("Error in get index %v", err) readCtxCancel() return } - nextSaveCheck := time.After(2 * time.Second) - defer func() { - readCtxCancel() - r.setIndex(r.indexPath, index{value: r.lastSavedIndex}) - }() - + nextSaveCheck := time.NewTicker(2 * time.Second) + defer readCtxCancel() go r.read(readCtx, lastIndex.next()) for { + select { case <-mainCtx.Done(): r.log.Errorf("%s reader done", r.topic) + r.saveIndex(r.lastSavedIndex) return - case <-nextSaveCheck: + case <-nextSaveCheck.C: if lastIndex.value > r.lastSavedIndex { - r.setIndex(r.indexPath, lastIndex) - r.lastSavedIndex = lastIndex.value + r.saveIndex(lastIndex.value) + } + if lastIndex.value%1000 == 0 { + r.saveIndex(lastIndex.value) } - nextSaveCheck = time.After(2 * time.Second) case err := <-r.readDone: var errBoundaryFault *boundaryFault if err != nil && errors.As(err, &errBoundaryFault) { r.log.Debugf("detected boundary fault, restarting %s tank read from index %d", r.topic, errBoundaryFault.nextAvailableIndex.value) - go r.read(readCtx, errBoundaryFault.nextAvailableIndex) + lastIndex = errBoundaryFault.nextAvailableIndex + go r.read(readCtx, lastIndex) } else { - go r.read(readCtx, lastIndex.next()) + lastIndex = lastIndex.next() + go r.read(readCtx, lastIndex) } } } @@ -227,7 +225,9 @@ func (r *Reader) read(readCtx context.Context, startingIndex index) { time.Sleep(r.restartDelay) } - r.readDone <- err + defer func() { + r.readDone <- err + }() } func (r *Reader) readFromTank(readCtx context.Context, startingIndex index) (err error) { @@ -276,10 +276,7 @@ func (r *Reader) readFromTank(readCtx context.Context, startingIndex index) (err var collectedMessages []IndexedMessage for _, message := range messages { collectedMessages = append(collectedMessages, *message) - } - if len(collectedMessages) > 0 { - r.lastSavedIndex = collectedMessages[len(collectedMessages)-1].Index.value - r.setIndex(r.indexPath, index{value: r.lastSavedIndex}) + r.lastSavedIndex = message.Index.value } select { case <-readCtx.Done(): From 3ae649ae2e0be05c6cd8b175805ea5f06bf030f4 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Tue, 24 Oct 2023 13:07:36 -0400 Subject: [PATCH 08/13] added data precision WAN-2496 #time 30m --- plugins/inputs/t128_tank/t128_tank.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index b10207dcc3b48..a2706fe986d00 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -7,8 +7,10 @@ import ( "strconv" "strings" "sync" + "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -44,18 +46,20 @@ var sampleConfig = ` ` type T128Tank struct { - IndexFile string `toml:"index-file"` - Topic string `toml:"topic"` - PortNumber int `toml:"port_number"` - ServerAddress string `toml:"server_address"` - SequenceNumberField string `toml:"sequence_number_field"` - From string `toml:"from"` + IndexFile string `toml:"index-file"` + Topic string `toml:"topic"` + PortNumber int `toml:"port_number"` + ServerAddress string `toml:"server_address"` + SequenceNumberField string `toml:"sequence_number_field"` + From string `toml:"from"` + Precision config.Duration `toml:"data_precision"` Log telegraf.Logger ctx context.Context mainWG sync.WaitGroup cancel context.CancelFunc parser parsers.Parser defaultStartingIndex index + adjustTime func(telegraf.Metric) } func (*T128Tank) SampleConfig() string { @@ -76,6 +80,12 @@ func (plugin *T128Tank) Init() error { return err } + if plugin.Precision != config.Duration(time.Nanosecond) { + plugin.adjustTime = func(m telegraf.Metric) { + adjustedNano := m.Time().UnixNano() * int64(plugin.Precision) + m.SetTime(time.Unix(0, adjustedNano)) + } + } return nil } @@ -84,6 +94,9 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error { } func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { + if plugin.adjustTime == nil { + plugin.adjustTime = func(m telegraf.Metric) {} + } plugin.ctx, plugin.cancel = context.WithCancel(context.Background()) reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.defaultStartingIndex, plugin.Log, acc) plugin.mainWG.Add(1) @@ -103,6 +116,7 @@ func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { if plugin.SequenceNumberField != "" { metric.AddField(plugin.SequenceNumberField, strconv.FormatUint(message.Index.value, 10)) } + plugin.adjustTime(metric) acc.AddMetric(metric) } } From d10c59563854a144f9ef3ca6c66c57290a8d28e2 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Wed, 25 Oct 2023 00:03:28 -0400 Subject: [PATCH 09/13] addressed more comments WAN-2496 #time 1h --- plugins/inputs/t128_tank/reader.go | 69 ++++++++++++++++----------- plugins/inputs/t128_tank/t128_tank.go | 8 +++- 2 files changed, 47 insertions(+), 30 deletions(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 01bf86790c222..8c70cd9c74b2a 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -80,8 +80,8 @@ func (i index) next() index { type CommandContext = func(ctx context.Context, name string, arg ...string) *exec.Cmd type Reader struct { - topic string - lastSavedIndex uint64 + topic string + lastObservedIndex chan uint64 // used to send events from the read routine to the send routine sendChan chan []IndexedMessage // the target address for the TANK instance @@ -106,17 +106,18 @@ type Reader struct { func NewReader(tankAddress string, tankPort int, topic string, indexPath string, defaultIndex index, log telegraf.Logger, acc telegraf.Accumulator) *Reader { return &Reader{ - topic: topic, - tankReadCmdCtx: exec.CommandContext, - sendChan: make(chan []IndexedMessage), - readDone: make(chan error), - restartDelay: 5 * time.Second, - tankAddress: tankAddress, - tankPort: tankPort, - indexPath: indexPath, - defaultIndex: defaultIndex, - log: log, - acc: acc, + topic: topic, + lastObservedIndex: make(chan uint64, 1), + tankReadCmdCtx: exec.CommandContext, + sendChan: make(chan []IndexedMessage), + readDone: make(chan error), + restartDelay: 5 * time.Second, + tankAddress: tankAddress, + tankPort: tankPort, + indexPath: indexPath, + defaultIndex: defaultIndex, + log: log, + acc: acc, } } @@ -125,11 +126,6 @@ func (r *Reader) withTankReadCommandContext(tankReadCmdCtx CommandContext) *Read return r } -func (r *Reader) saveIndex(indexValue uint64) { - r.lastSavedIndex = indexValue - r.setIndex(r.indexPath, index{value: indexValue}) -} - func (r *Reader) Run(mainCtx context.Context) { readCtx, readCtxCancel := context.WithCancel(mainCtx) lastIndex, err := r.getIndex(r.indexPath, r.defaultIndex) @@ -138,9 +134,17 @@ func (r *Reader) Run(mainCtx context.Context) { readCtxCancel() return } - nextSaveCheck := time.NewTicker(2 * time.Second) - defer readCtxCancel() + defer func() { + readCtxCancel() + close(r.lastObservedIndex) + observedValue, ok := <-r.lastObservedIndex + if ok { + r.setIndex(r.indexPath, index{value: observedValue}) + } else { + r.setIndex(r.indexPath, index{value: lastIndex.value}) + } + }() go r.read(readCtx, lastIndex.next()) for { @@ -148,14 +152,23 @@ func (r *Reader) Run(mainCtx context.Context) { select { case <-mainCtx.Done(): r.log.Errorf("%s reader done", r.topic) - r.saveIndex(r.lastSavedIndex) return + case observedValue := <-r.lastObservedIndex: + lastIndex.value = observedValue + if lastIndex.value%1000 == 0 { + r.setIndex(r.indexPath, index{value: lastIndex.value}) + } case <-nextSaveCheck.C: - if lastIndex.value > r.lastSavedIndex { - r.saveIndex(lastIndex.value) + var observedValue uint64 + select { + case observedValue = <-r.lastObservedIndex: + default: + continue } - if lastIndex.value%1000 == 0 { - r.saveIndex(lastIndex.value) + if lastIndex.value > observedValue { + lastIndex.value = observedValue + r.lastObservedIndex <- lastIndex.value + r.setIndex(r.indexPath, index{value: lastIndex.value}) } case err := <-r.readDone: var errBoundaryFault *boundaryFault @@ -225,9 +238,7 @@ func (r *Reader) read(readCtx context.Context, startingIndex index) { time.Sleep(r.restartDelay) } - defer func() { - r.readDone <- err - }() + r.readDone <- err } func (r *Reader) readFromTank(readCtx context.Context, startingIndex index) (err error) { @@ -276,7 +287,7 @@ func (r *Reader) readFromTank(readCtx context.Context, startingIndex index) (err var collectedMessages []IndexedMessage for _, message := range messages { collectedMessages = append(collectedMessages, *message) - r.lastSavedIndex = message.Index.value + r.lastObservedIndex <- message.Index.value } select { case <-readCtx.Done(): diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index a2706fe986d00..4ee9be08fa480 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -95,7 +95,13 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error { func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { if plugin.adjustTime == nil { - plugin.adjustTime = func(m telegraf.Metric) {} + unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour) + plugin.adjustTime = func(m telegraf.Metric) { + mTime := m.Time() + if mTime.Before(unreasonableTimestamp) { + m.SetTime(unreasonableTimestamp.Truncate(time.Second)) + } + } } plugin.ctx, plugin.cancel = context.WithCancel(context.Background()) reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.defaultStartingIndex, plugin.Log, acc) From 3f918e6741edbd86c024b22e63303aebf3cfee65 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Wed, 25 Oct 2023 09:15:01 -0400 Subject: [PATCH 10/13] addressed comments and test for lastobserved value WAN-2496 #time 20m --- plugins/inputs/t128_tank/reader.go | 19 ++++--------------- plugins/inputs/t128_tank/t128_tank.go | 10 +++++----- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 8c70cd9c74b2a..5d320612f3ee1 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -100,11 +100,9 @@ type Reader struct { defaultIndex index // telegraf Logger log telegraf.Logger - // telegraf accumulator - acc telegraf.Accumulator } -func NewReader(tankAddress string, tankPort int, topic string, indexPath string, defaultIndex index, log telegraf.Logger, acc telegraf.Accumulator) *Reader { +func NewReader(tankAddress string, tankPort int, topic string, indexPath string, defaultIndex index, log telegraf.Logger) *Reader { return &Reader{ topic: topic, lastObservedIndex: make(chan uint64, 1), @@ -117,7 +115,6 @@ func NewReader(tankAddress string, tankPort int, topic string, indexPath string, indexPath: indexPath, defaultIndex: defaultIndex, log: log, - acc: acc, } } @@ -154,20 +151,12 @@ func (r *Reader) Run(mainCtx context.Context) { r.log.Errorf("%s reader done", r.topic) return case observedValue := <-r.lastObservedIndex: - lastIndex.value = observedValue - if lastIndex.value%1000 == 0 { + if lastIndex.value < observedValue { + lastIndex.value = observedValue r.setIndex(r.indexPath, index{value: lastIndex.value}) } case <-nextSaveCheck.C: - var observedValue uint64 - select { - case observedValue = <-r.lastObservedIndex: - default: - continue - } - if lastIndex.value > observedValue { - lastIndex.value = observedValue - r.lastObservedIndex <- lastIndex.value + if lastIndex.value%1000 == 0 { r.setIndex(r.indexPath, index{value: lastIndex.value}) } case err := <-r.readDone: diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index 4ee9be08fa480..11e36b3e27ad2 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -95,16 +95,16 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error { func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { if plugin.adjustTime == nil { - unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour) + unreasonableTimestamp := time.Unix(24*60*60, 0) plugin.adjustTime = func(m telegraf.Metric) { - mTime := m.Time() - if mTime.Before(unreasonableTimestamp) { - m.SetTime(unreasonableTimestamp.Truncate(time.Second)) + mTime := m.Time().Unix() + if mTime < unreasonableTimestamp.Unix() { + m.SetTime(unreasonableTimestamp) } } } plugin.ctx, plugin.cancel = context.WithCancel(context.Background()) - reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.defaultStartingIndex, plugin.Log, acc) + reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.defaultStartingIndex, plugin.Log) plugin.mainWG.Add(1) go func() { defer plugin.mainWG.Done() From 198e280f032d0a36ada285395ed01cb70a191123 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Wed, 25 Oct 2023 11:51:12 -0400 Subject: [PATCH 11/13] added testcase WAN-2496 #time 20m --- plugins/inputs/t128_tank/t128_tank.go | 9 +-- plugins/inputs/t128_tank/t128_tank_test.go | 66 ++++++++++++++++++++-- 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index 11e36b3e27ad2..02f71480d87db 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -95,11 +95,12 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error { func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { if plugin.adjustTime == nil { - unreasonableTimestamp := time.Unix(24*60*60, 0) + unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour) plugin.adjustTime = func(m telegraf.Metric) { - mTime := m.Time().Unix() - if mTime < unreasonableTimestamp.Unix() { - m.SetTime(unreasonableTimestamp) + mTime := m.Time() + if mTime.Before(unreasonableTimestamp) { + adjustedTime := unreasonableTimestamp.Unix() + m.SetTime(time.Unix(adjustedTime, 0)) } } } diff --git a/plugins/inputs/t128_tank/t128_tank_test.go b/plugins/inputs/t128_tank/t128_tank_test.go index 22003ede53dcd..d6c95b510fb75 100644 --- a/plugins/inputs/t128_tank/t128_tank_test.go +++ b/plugins/inputs/t128_tank/t128_tank_test.go @@ -11,6 +11,9 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" ) @@ -78,7 +81,6 @@ func TestT128TankReader(t *testing.T) { ServerAddress: testcase.ServerAddress, } - var acc testutil.Accumulator var receivedMessages []IndexedMessage ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -95,7 +97,6 @@ func TestT128TankReader(t *testing.T) { plugin.IndexFile, testcase.DefaultIndex, testutil.Logger{}, - &acc, ).withTankReadCommandContext(testcase.TankReadCommandContext) wg.Add(1) go func() { @@ -155,7 +156,6 @@ func TestBoundaryFault(t *testing.T) { ServerAddress: testcase.ServerAddress, } - var acc testutil.Accumulator var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -173,7 +173,6 @@ func TestBoundaryFault(t *testing.T) { plugin.IndexFile, testcase.DefaultIndex, testutil.Logger{}, - &acc, ).withTankReadCommandContext(testcase.TankReadCommandContext) var receivedErrorMessage string wg.Add(1) @@ -240,3 +239,62 @@ func TestIndexParsing(t *testing.T) { _, err = newIndex("foo") assert.Error(t, err) } + +func newMetric(name string, tags map[string]string, fields map[string]interface{}) telegraf.Metric { + if tags == nil { + tags = map[string]string{} + } + if fields == nil { + fields = map[string]interface{}{} + } + m := metric.New(name, tags, fields, time.Now()) + return m +} + +func TestUnreasonableTimestamp(t *testing.T) { + testCases := []struct { + Name string + UnreasonableTimestamp time.Time + ExpectedMessage time.Time + Precision time.Duration + }{ + { + Name: "Precision in Nanosecond", + ExpectedMessage: time.Unix(24*60*60, 0), + Precision: time.Nanosecond, + }, + { + Name: "Precision is Empty", + ExpectedMessage: time.Unix(0, 0), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + fmt.Println(testCase.Name) + metric := newMetric("test_metric", nil, nil) + metric.SetTime(testCase.UnreasonableTimestamp) + + plugin := &T128Tank{ + Topic: "test", + Precision: config.Duration(testCase.Precision), + } + plugin.Init() + if plugin.adjustTime == nil { + unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour) + plugin.adjustTime = func(m telegraf.Metric) { + mTime := m.Time() + if mTime.Before(unreasonableTimestamp) { + adjustedSeconds := unreasonableTimestamp.Unix() + m.SetTime(time.Unix(adjustedSeconds, 0)) + } + } + } + plugin.adjustTime(metric) + + if !metric.Time().Equal(testCase.ExpectedMessage) { + t.Errorf("Expected time: %s, Actual time: %s", testCase.ExpectedMessage, metric.Time()) + } + }) + } +} From 4e05389cc918fd872dbfe9ca38c2529964babed4 Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Wed, 25 Oct 2023 16:09:39 -0400 Subject: [PATCH 12/13] addressed comments WAN-2496 #time 15m --- plugins/inputs/t128_tank/reader.go | 11 ++++++----- plugins/inputs/t128_tank/t128_tank.go | 2 +- plugins/inputs/t128_tank/t128_tank_test.go | 19 ++++--------------- 3 files changed, 11 insertions(+), 21 deletions(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 5d320612f3ee1..9c36be9662035 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -125,6 +125,7 @@ func (r *Reader) withTankReadCommandContext(tankReadCmdCtx CommandContext) *Read func (r *Reader) Run(mainCtx context.Context) { readCtx, readCtxCancel := context.WithCancel(mainCtx) + var observedValue uint64 lastIndex, err := r.getIndex(r.indexPath, r.defaultIndex) if err != nil { r.log.Errorf("Error in get index %v", err) @@ -145,20 +146,20 @@ func (r *Reader) Run(mainCtx context.Context) { go r.read(readCtx, lastIndex.next()) for { - select { case <-mainCtx.Done(): r.log.Errorf("%s reader done", r.topic) return - case observedValue := <-r.lastObservedIndex: - if lastIndex.value < observedValue { - lastIndex.value = observedValue + case observedValue = <-r.lastObservedIndex: + if lastIndex.value%1000 == 0 { r.setIndex(r.indexPath, index{value: lastIndex.value}) } case <-nextSaveCheck.C: - if lastIndex.value%1000 == 0 { + if observedValue > lastIndex.value { + lastIndex.value = observedValue r.setIndex(r.indexPath, index{value: lastIndex.value}) } + case err := <-r.readDone: var errBoundaryFault *boundaryFault if err != nil && errors.As(err, &errBoundaryFault) { diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index 02f71480d87db..02bccad0ac8f0 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -99,7 +99,7 @@ func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { plugin.adjustTime = func(m telegraf.Metric) { mTime := m.Time() if mTime.Before(unreasonableTimestamp) { - adjustedTime := unreasonableTimestamp.Unix() + adjustedTime := unreasonableTimestamp.Unix() * int64(plugin.Precision) m.SetTime(time.Unix(adjustedTime, 0)) } } diff --git a/plugins/inputs/t128_tank/t128_tank_test.go b/plugins/inputs/t128_tank/t128_tank_test.go index d6c95b510fb75..bab7cef69317f 100644 --- a/plugins/inputs/t128_tank/t128_tank_test.go +++ b/plugins/inputs/t128_tank/t128_tank_test.go @@ -247,7 +247,7 @@ func newMetric(name string, tags map[string]string, fields map[string]interface{ if fields == nil { fields = map[string]interface{}{} } - m := metric.New(name, tags, fields, time.Now()) + m := metric.New(name, tags, fields, time.Date(1960, time.October, 25, 12, 0, 0, 0, time.UTC)) return m } @@ -272,29 +272,18 @@ func TestUnreasonableTimestamp(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.Name, func(t *testing.T) { fmt.Println(testCase.Name) + var acc testutil.Accumulator metric := newMetric("test_metric", nil, nil) - metric.SetTime(testCase.UnreasonableTimestamp) plugin := &T128Tank{ Topic: "test", Precision: config.Duration(testCase.Precision), } plugin.Init() - if plugin.adjustTime == nil { - unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour) - plugin.adjustTime = func(m telegraf.Metric) { - mTime := m.Time() - if mTime.Before(unreasonableTimestamp) { - adjustedSeconds := unreasonableTimestamp.Unix() - m.SetTime(time.Unix(adjustedSeconds, 0)) - } - } - } + plugin.Start(&acc) plugin.adjustTime(metric) - if !metric.Time().Equal(testCase.ExpectedMessage) { - t.Errorf("Expected time: %s, Actual time: %s", testCase.ExpectedMessage, metric.Time()) - } + assert.Equal(t, testCase.ExpectedMessage, metric.Time()) }) } } From 65dcd5a2e1d9ad8b2207ca27d0bcca10cb6f6422 Mon Sep 17 00:00:00 2001 From: Greg Schrock Date: Mon, 30 Oct 2023 11:41:11 -0400 Subject: [PATCH 13/13] Fix timestamp precision adjustments WAN-2496 #time 1h --- plugins/inputs/t128_tank/t128_tank.go | 43 +++++++---- plugins/inputs/t128_tank/t128_tank_test.go | 88 ++++++++++++++++++---- 2 files changed, 101 insertions(+), 30 deletions(-) diff --git a/plugins/inputs/t128_tank/t128_tank.go b/plugins/inputs/t128_tank/t128_tank.go index 02bccad0ac8f0..01c5c6e11bc2f 100644 --- a/plugins/inputs/t128_tank/t128_tank.go +++ b/plugins/inputs/t128_tank/t128_tank.go @@ -46,13 +46,13 @@ var sampleConfig = ` ` type T128Tank struct { - IndexFile string `toml:"index-file"` - Topic string `toml:"topic"` - PortNumber int `toml:"port_number"` - ServerAddress string `toml:"server_address"` - SequenceNumberField string `toml:"sequence_number_field"` - From string `toml:"from"` - Precision config.Duration `toml:"data_precision"` + IndexFile string `toml:"index-file"` + Topic string `toml:"topic"` + PortNumber int `toml:"port_number"` + ServerAddress string `toml:"server_address"` + SequenceNumberField string `toml:"sequence_number_field"` + From string `toml:"from"` + Precision *config.Duration `toml:"data_precision"` Log telegraf.Logger ctx context.Context mainWG sync.WaitGroup @@ -80,10 +80,9 @@ func (plugin *T128Tank) Init() error { return err } - if plugin.Precision != config.Duration(time.Nanosecond) { + if plugin.Precision != nil { plugin.adjustTime = func(m telegraf.Metric) { - adjustedNano := m.Time().UnixNano() * int64(plugin.Precision) - m.SetTime(time.Unix(0, adjustedNano)) + m.SetTime(reinterpretTimestampPrecision(m.Time(), *plugin.Precision)) } } return nil @@ -94,18 +93,25 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error { } func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { - if plugin.adjustTime == nil { + if plugin.adjustTime == nil && plugin.Precision == nil { unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour) + reinterpretPrecision := config.Duration(1 * time.Second) plugin.adjustTime = func(m telegraf.Metric) { mTime := m.Time() if mTime.Before(unreasonableTimestamp) { - adjustedTime := unreasonableTimestamp.Unix() * int64(plugin.Precision) - m.SetTime(time.Unix(adjustedTime, 0)) + m.SetTime(reinterpretTimestampPrecision(mTime, reinterpretPrecision)) } } } plugin.ctx, plugin.cancel = context.WithCancel(context.Background()) - reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, plugin.defaultStartingIndex, plugin.Log) + reader := NewReader( + plugin.ServerAddress, + plugin.PortNumber, + plugin.Topic, + plugin.IndexFile, + plugin.defaultStartingIndex, + plugin.Log, + ) plugin.mainWG.Add(1) go func() { defer plugin.mainWG.Done() @@ -123,7 +129,9 @@ func (plugin *T128Tank) Start(acc telegraf.Accumulator) error { if plugin.SequenceNumberField != "" { metric.AddField(plugin.SequenceNumberField, strconv.FormatUint(message.Index.value, 10)) } - plugin.adjustTime(metric) + if plugin.adjustTime != nil { + plugin.adjustTime(metric) + } acc.AddMetric(metric) } } @@ -181,6 +189,11 @@ func validateFrom(from string) error { return nil } +func reinterpretTimestampPrecision(current time.Time, precision config.Duration) time.Time { + adjustedNano := current.UnixNano() * int64(precision) + return time.Unix(0, adjustedNano) +} + func init() { inputs.Add("t128_tank", func() telegraf.Input { return &T128Tank{ diff --git a/plugins/inputs/t128_tank/t128_tank_test.go b/plugins/inputs/t128_tank/t128_tank_test.go index bab7cef69317f..061d72a604675 100644 --- a/plugins/inputs/t128_tank/t128_tank_test.go +++ b/plugins/inputs/t128_tank/t128_tank_test.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" ) @@ -251,21 +252,64 @@ func newMetric(name string, tags map[string]string, fields map[string]interface{ return m } -func TestUnreasonableTimestamp(t *testing.T) { +func TestPrecisionTimestamp(t *testing.T) { + nanosecond := config.Duration(1 * time.Nanosecond) + second := config.Duration(1 * time.Second) + + reasonableTimestamp, err := time.Parse(time.RFC3339Nano, "2023-01-01T10:15:23.578Z") + if !assert.NoError(t, err) { + return + } + reasonableTimestamp = reasonableTimestamp.UTC() + + reasonableSeconds := reasonableTimestamp.Unix() + incorrectlyInterpreted := time.Unix(reasonableSeconds/(10e9), reasonableSeconds%(10e9)).UTC() + testCases := []struct { - Name string - UnreasonableTimestamp time.Time - ExpectedMessage time.Time - Precision time.Duration + Name string + ConfiguredPrecision *config.Duration + ActualTimestamp int64 + ExpectedTimestamp time.Time }{ { - Name: "Precision in Nanosecond", - ExpectedMessage: time.Unix(24*60*60, 0), - Precision: time.Nanosecond, + Name: "Precision in Nanosecond", + ConfiguredPrecision: &nanosecond, + ActualTimestamp: reasonableTimestamp.UnixNano(), + ExpectedTimestamp: reasonableTimestamp, + }, + { + Name: "Precision in Nanosecond with unreasonable timestamp", + ConfiguredPrecision: &nanosecond, + ActualTimestamp: incorrectlyInterpreted.UnixNano(), + ExpectedTimestamp: incorrectlyInterpreted, + }, + { + Name: "Precision is Seconds", + ConfiguredPrecision: &second, + ActualTimestamp: reasonableTimestamp.Unix(), + ExpectedTimestamp: reasonableTimestamp.Truncate(1 * time.Second), + }, + { + Name: "Precision is Seconds with unreasonable timestamp", + ConfiguredPrecision: &second, + ActualTimestamp: incorrectlyInterpreted.Unix(), + ExpectedTimestamp: incorrectlyInterpreted.Truncate(1 * time.Second), }, { - Name: "Precision is Empty", - ExpectedMessage: time.Unix(0, 0), + Name: "Incorrectly Interpreted", + ConfiguredPrecision: &nanosecond, + ActualTimestamp: reasonableTimestamp.Unix(), + ExpectedTimestamp: incorrectlyInterpreted, + }, + { + Name: "Precision is empty with unreasonable timestamp", + ActualTimestamp: reasonableTimestamp.Unix(), + ExpectedTimestamp: reasonableTimestamp.Truncate(1 * time.Second), + }, + { + Name: "Precision is empty with reasonable timestamp", + ActualTimestamp: reasonableTimestamp.UnixNano(), + ExpectedTimestamp: reasonableTimestamp, }, } @@ -273,17 +317,31 @@ func TestUnreasonableTimestamp(t *testing.T) { t.Run(testCase.Name, func(t *testing.T) { fmt.Println(testCase.Name) var acc testutil.Accumulator - metric := newMetric("test_metric", nil, nil) plugin := &T128Tank{ Topic: "test", - Precision: config.Duration(testCase.Precision), + Log: testutil.Logger{}, + Precision: testCase.ConfiguredPrecision, + } + + if !assert.NoError(t, plugin.Init()) { + return } - plugin.Init() plugin.Start(&acc) - plugin.adjustTime(metric) - assert.Equal(t, testCase.ExpectedMessage, metric.Time()) + metricHandler := influx.NewMetricHandler() + metricParser := influx.NewParser(metricHandler) + metricParser.ParseLine(fmt.Sprintf("test_metric value=10i %v", testCase.ActualTimestamp)) + metric, err := metricHandler.Metric() + if !assert.NoError(t, err) { + return + } + + if plugin.adjustTime != nil { + plugin.adjustTime(metric) + } + + assert.Equal(t, testCase.ExpectedTimestamp, metric.Time().UTC()) }) } }