Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated t128_tank to parse the messages #63

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion plugins/inputs/t128_tank/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,23 @@ The tank input plugin collects data from a 128T.
[[inputs.t128_tank]]
## A (unique) file to use for index tracking.
## This tracking allows each event to be produced once.
# index_file = ""
# index-file = ""

## Required. The TANK topic to consume.
# topic = "events"

## Port Number to get tank data from.
# port_number = 11011

## A field name to display index number
# sequence_number_field = ""

## Server Address to get tank data from.
# server_address = "127.0.0.1"

## From specifies the first message we are interested in.
## If from is "start", it will start consuming from the
## first available message in the selected topic.
## If it is "end", it will tail the topic for newly produced messages.
# from = "end"
```
72 changes: 40 additions & 32 deletions plugins/inputs/t128_tank/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func (i index) next() index {
type CommandContext = func(ctx context.Context, name string, arg ...string) *exec.Cmd

type Reader struct {
topic string
lastSavedIndex uint64
topic string
lastObservedIndex chan uint64
// used to send events from the read routine to the send routine
sendChan chan []IndexedMessage
// the target address for the TANK instance
Expand All @@ -100,23 +100,21 @@ type Reader struct {
defaultIndex index
// telegraf Logger
log telegraf.Logger
// telegraf accumulator
acc telegraf.Accumulator
}

func NewReader(tankAddress string, tankPort int, topic string, indexPath string, defaultIndex index, log telegraf.Logger, acc telegraf.Accumulator) *Reader {
func NewReader(tankAddress string, tankPort int, topic string, indexPath string, defaultIndex index, log telegraf.Logger) *Reader {
return &Reader{
topic: topic,
tankReadCmdCtx: exec.CommandContext,
sendChan: make(chan []IndexedMessage),
readDone: make(chan error),
restartDelay: 5 * time.Second,
tankAddress: tankAddress,
tankPort: tankPort,
indexPath: indexPath,
defaultIndex: defaultIndex,
log: log,
acc: acc,
topic: topic,
lastObservedIndex: make(chan uint64, 1),
tankReadCmdCtx: exec.CommandContext,
sendChan: make(chan []IndexedMessage),
readDone: make(chan error),
restartDelay: 5 * time.Second,
tankAddress: tankAddress,
tankPort: tankPort,
indexPath: indexPath,
defaultIndex: defaultIndex,
log: log,
}
}

Expand All @@ -127,40 +125,50 @@ func (r *Reader) withTankReadCommandContext(tankReadCmdCtx CommandContext) *Read

func (r *Reader) Run(mainCtx context.Context) {
readCtx, readCtxCancel := context.WithCancel(mainCtx)
var observedValue uint64
lastIndex, err := r.getIndex(r.indexPath, r.defaultIndex)
r.lastSavedIndex = lastIndex.value
if err != nil {
r.log.Errorf("Error in get index %v", err)
readCtxCancel()
return
}

nextSaveCheck := time.After(2 * time.Second)
nextSaveCheck := time.NewTicker(2 * time.Second)
defer func() {
readCtxCancel()
r.setIndex(r.indexPath, index{value: r.lastSavedIndex})
close(r.lastObservedIndex)
observedValue, ok := <-r.lastObservedIndex
if ok {
r.setIndex(r.indexPath, index{value: observedValue})
} else {
r.setIndex(r.indexPath, index{value: lastIndex.value})
}
}()

go r.read(readCtx, lastIndex.next())

for {
select {
case <-mainCtx.Done():
r.log.Errorf("%s reader done", r.topic)
return
case <-nextSaveCheck:
if lastIndex.value > r.lastSavedIndex {
r.setIndex(r.indexPath, lastIndex)
r.lastSavedIndex = lastIndex.value
case observedValue = <-r.lastObservedIndex:
if lastIndex.value%1000 == 0 {
r.setIndex(r.indexPath, index{value: lastIndex.value})
}
nextSaveCheck = time.After(2 * time.Second)
case <-nextSaveCheck.C:
if observedValue > lastIndex.value {
lastIndex.value = observedValue
r.setIndex(r.indexPath, index{value: lastIndex.value})
}

case err := <-r.readDone:
var errBoundaryFault *boundaryFault
if err != nil && errors.As(err, &errBoundaryFault) {
r.log.Debugf("detected boundary fault, restarting %s tank read from index %d", r.topic, errBoundaryFault.nextAvailableIndex.value)
go r.read(readCtx, errBoundaryFault.nextAvailableIndex)
lastIndex = errBoundaryFault.nextAvailableIndex
go r.read(readCtx, lastIndex)
} else {
go r.read(readCtx, lastIndex.next())
lastIndex = lastIndex.next()
go r.read(readCtx, lastIndex)
}
}
}
Expand All @@ -179,10 +187,9 @@ func (r *Reader) getIndex(indexPath string, defaultIndex index) (index, error) {
} else if err != nil {
return defaultIndex, fmt.Errorf("encountered error reading index file, starting with default index %s: %s", defaultIndex.string(), err)
}

r.log.Debugf("found '%s' in index file", content)

index, err := newIndex(string(content))
newContent := strings.Split(string(content), "\n")[0]
r.log.Debugf("found '%s' in index file", newContent)
index, err := newIndex(newContent)
if err != nil {
return defaultIndex, fmt.Errorf("encountered error while parsing index file content, starting with default index %s: %s", defaultIndex.string(), err)
}
Expand Down Expand Up @@ -270,6 +277,7 @@ func (r *Reader) readFromTank(readCtx context.Context, startingIndex index) (err
var collectedMessages []IndexedMessage
for _, message := range messages {
collectedMessages = append(collectedMessages, *message)
r.lastObservedIndex <- message.Index.value
}
select {
case <-readCtx.Done():
Expand Down
133 changes: 96 additions & 37 deletions plugins/inputs/t128_tank/t128_tank.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package t128_tank

import (
"context"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)

const (
Expand All @@ -22,32 +24,42 @@ var sampleConfig = `
[[inputs.t128_tank]]
## A (unique) file to use for index tracking.
## This tracking allows each event to be produced once.
# index_file = ""
# index-file = ""

## Required. The TANK topic to consume.
# topic = "events"

## Port Number to get tank data from.
# port_number = 11011

## A field name to display index number
# sequence_number_field = ""

## Server Address to get tank data from.
# server_address = "127.0.0.1"
`

var (
typePattern = `,type=([^\s]+)`
recordTypePattern = `recordType=([^\s]+)`
)
## From specifies the first message we are interested in.
## If from is "start", it will start consuming from the
## first available message in the selected topic.
## If it is "end", it will tail the topic for newly produced messages.
# from = "end"
`

type T128Tank struct {
IndexFile string `toml:"index_file"`
Topic string `toml:"topic"`
PortNumber int `toml:"port_number"`
ServerAddress string `toml:"server_address"`
Log telegraf.Logger
ctx context.Context
mainWG sync.WaitGroup
cancel context.CancelFunc
IndexFile string `toml:"index-file"`
Topic string `toml:"topic"`
PortNumber int `toml:"port_number"`
ServerAddress string `toml:"server_address"`
SequenceNumberField string `toml:"sequence_number_field"`
From string `toml:"from"`
Precision *config.Duration `toml:"data_precision"`
Log telegraf.Logger
ctx context.Context
mainWG sync.WaitGroup
cancel context.CancelFunc
parser parsers.Parser
defaultStartingIndex index
adjustTime func(telegraf.Metric)
}

func (*T128Tank) SampleConfig() string {
Expand All @@ -58,12 +70,21 @@ func (*T128Tank) Description() string {
return "Run TANK as a long-running input plugin"
}

func (plugin *T128Tank) SetParser(parser parsers.Parser) {
plugin.parser = parser
}

func (plugin *T128Tank) Init() error {
err := plugin.checkConfig()
if err != nil {
return err
}

if plugin.Precision != nil {
plugin.adjustTime = func(m telegraf.Metric) {
m.SetTime(reinterpretTimestampPrecision(m.Time(), *plugin.Precision))
}
}
return nil
}

Expand All @@ -72,8 +93,25 @@ func (plugin *T128Tank) Gather(_ telegraf.Accumulator) error {
}

func (plugin *T128Tank) Start(acc telegraf.Accumulator) error {
if plugin.adjustTime == nil && plugin.Precision == nil {
unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour)
reinterpretPrecision := config.Duration(1 * time.Second)
plugin.adjustTime = func(m telegraf.Metric) {
mTime := m.Time()
if mTime.Before(unreasonableTimestamp) {
m.SetTime(reinterpretTimestampPrecision(mTime, reinterpretPrecision))
}
}
}
plugin.ctx, plugin.cancel = context.WithCancel(context.Background())
reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, StartIndex, plugin.Log, acc)
reader := NewReader(
plugin.ServerAddress,
plugin.PortNumber,
plugin.Topic,
plugin.IndexFile,
plugin.defaultStartingIndex,
plugin.Log,
)
plugin.mainWG.Add(1)
go func() {
defer plugin.mainWG.Done()
Expand All @@ -83,20 +121,19 @@ func (plugin *T128Tank) Start(acc telegraf.Accumulator) error {
return
case messages := <-reader.sendChan:
for _, message := range messages {

tags := map[string]string{
"index": strconv.FormatUint(message.Index.value, 10),
metrics, err := plugin.parser.Parse(message.Message)
if err != nil {
acc.AddError(err)
}
if strings.ToLower(plugin.Topic) == "events" {
messageType := extractTopicType(string(message.Message), typePattern)
tags["type"] = messageType
} else if strings.ToLower(plugin.Topic) == "session_records" {
messageType := extractTopicType(string(message.Message), recordTypePattern)
tags["recordType"] = messageType
for _, metric := range metrics {
if plugin.SequenceNumberField != "" {
metric.AddField(plugin.SequenceNumberField, strconv.FormatUint(message.Index.value, 10))
}
if plugin.adjustTime != nil {
plugin.adjustTime(metric)
}
acc.AddMetric(metric)
}
acc.AddFields("t128_tank", map[string]interface{}{
"message": string(message.Message),
}, tags, time.Now())
}
}
}
Expand All @@ -118,23 +155,45 @@ func (plugin *T128Tank) Stop() {
plugin.mainWG.Wait()
}

func extractTopicType(message string, pattern string) string {
regex := regexp.MustCompile(pattern)
match := regex.FindStringSubmatch(message)
if len(match) > 1 {
return match[1]
}
return ""
}

func (plugin *T128Tank) checkConfig() error {
if plugin.Topic == "" {
return fmt.Errorf("topic is a required configuration field")
}

if plugin.From != "" {
err := validateFrom(plugin.From)
if err != nil {
return err
}
}

if strings.ToLower(plugin.From) == "end" {
plugin.defaultStartingIndex = EndIndex
} else if strings.ToLower(plugin.From) == "start" {
plugin.defaultStartingIndex = StartIndex
} else {
if plugin.IndexFile == "" {
plugin.defaultStartingIndex = EndIndex
} else {
plugin.defaultStartingIndex = StartIndex
}
}
return nil
}

func validateFrom(from string) error {
fromLower := strings.ToLower(from)
if fromLower != "start" && fromLower != "end" {
return errors.New("Invalid from value. Accepted values are 'start' or 'end'.")
}
return nil
}

func reinterpretTimestampPrecision(current time.Time, precision config.Duration) time.Time {
adjustedNano := current.UnixNano() * int64(precision)
return time.Unix(0, adjustedNano)
}

func init() {
inputs.Add("t128_tank", func() telegraf.Input {
return &T128Tank{
Expand Down
Loading