Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
WAN-2496 #time 15m
  • Loading branch information
shriyanshk128T committed Oct 25, 2023
1 parent 198e280 commit 4e05389
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 21 deletions.
11 changes: 6 additions & 5 deletions plugins/inputs/t128_tank/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (r *Reader) withTankReadCommandContext(tankReadCmdCtx CommandContext) *Read

func (r *Reader) Run(mainCtx context.Context) {
readCtx, readCtxCancel := context.WithCancel(mainCtx)
var observedValue uint64
lastIndex, err := r.getIndex(r.indexPath, r.defaultIndex)
if err != nil {
r.log.Errorf("Error in get index %v", err)
Expand All @@ -145,20 +146,20 @@ func (r *Reader) Run(mainCtx context.Context) {
go r.read(readCtx, lastIndex.next())

for {

select {
case <-mainCtx.Done():
r.log.Errorf("%s reader done", r.topic)
return
case observedValue := <-r.lastObservedIndex:
if lastIndex.value < observedValue {
lastIndex.value = observedValue
case observedValue = <-r.lastObservedIndex:
if lastIndex.value%1000 == 0 {
r.setIndex(r.indexPath, index{value: lastIndex.value})
}
case <-nextSaveCheck.C:
if lastIndex.value%1000 == 0 {
if observedValue > lastIndex.value {
lastIndex.value = observedValue
r.setIndex(r.indexPath, index{value: lastIndex.value})
}

case err := <-r.readDone:
var errBoundaryFault *boundaryFault
if err != nil && errors.As(err, &errBoundaryFault) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/t128_tank/t128_tank.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (plugin *T128Tank) Start(acc telegraf.Accumulator) error {
plugin.adjustTime = func(m telegraf.Metric) {
mTime := m.Time()
if mTime.Before(unreasonableTimestamp) {
adjustedTime := unreasonableTimestamp.Unix()
adjustedTime := unreasonableTimestamp.Unix() * int64(plugin.Precision)
m.SetTime(time.Unix(adjustedTime, 0))
}
}
Expand Down
19 changes: 4 additions & 15 deletions plugins/inputs/t128_tank/t128_tank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func newMetric(name string, tags map[string]string, fields map[string]interface{
if fields == nil {
fields = map[string]interface{}{}
}
m := metric.New(name, tags, fields, time.Now())
m := metric.New(name, tags, fields, time.Date(1960, time.October, 25, 12, 0, 0, 0, time.UTC))
return m
}

Expand All @@ -272,29 +272,18 @@ func TestUnreasonableTimestamp(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
fmt.Println(testCase.Name)
var acc testutil.Accumulator
metric := newMetric("test_metric", nil, nil)
metric.SetTime(testCase.UnreasonableTimestamp)

plugin := &T128Tank{
Topic: "test",
Precision: config.Duration(testCase.Precision),
}
plugin.Init()
if plugin.adjustTime == nil {
unreasonableTimestamp := time.Unix(0, 0).Add(24 * time.Hour)
plugin.adjustTime = func(m telegraf.Metric) {
mTime := m.Time()
if mTime.Before(unreasonableTimestamp) {
adjustedSeconds := unreasonableTimestamp.Unix()
m.SetTime(time.Unix(adjustedSeconds, 0))
}
}
}
plugin.Start(&acc)
plugin.adjustTime(metric)

if !metric.Time().Equal(testCase.ExpectedMessage) {
t.Errorf("Expected time: %s, Actual time: %s", testCase.ExpectedMessage, metric.Time())
}
assert.Equal(t, testCase.ExpectedMessage, metric.Time())
})
}
}

0 comments on commit 4e05389

Please sign in to comment.