Skip to content

Commit

Permalink
feat: Implement AutoEvent onChangeThreshold
Browse files Browse the repository at this point in the history
The onChange flag in Device AutoEvent can prevent any non changed values sent out. The onChangeThreshold could provide the advanced feature, and the default value is 0, any changed value that exceeds (>) bounds shall be published.
For example, Current_value - Current_prev > 0.01 shall be published where "Current_value" is the new value and "Current_prev" is the previously published value.

Signed-off-by: bruce <weichou1229@gmail.com>
  • Loading branch information
weichou1229 committed Nov 1, 2024
1 parent a0938cd commit 44a1270
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ go 1.23
require (
github.com/OneOfOne/xxhash v1.2.8
github.com/edgexfoundry/go-mod-bootstrap/v4 v4.0.0-dev.2
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.2
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.3
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.3
github.com/google/uuid v1.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/labstack/echo/v4 v4.12.0
github.com/panjf2000/ants/v2 v2.10.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/spf13/cast v1.7.0
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.30.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -98,7 +99,6 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/speps/go-hashids v2.0.0+incompatible // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spiffe/go-spiffe/v2 v2.4.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v4 v4.0.0-dev.2 h1:T5iCk8PqEdrzgnz6G9xt
github.com/edgexfoundry/go-mod-bootstrap/v4 v4.0.0-dev.2/go.mod h1:54WyXiygNbIfITqLVGXU8nOatZh7pMAyO3NQXOuPr5s=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.3 h1:3SdjghkEqos8AySKmz+ehjmI1HP/EmnRaFwNTf0rbyc=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.3/go.mod h1:s/pjxzTfqbsH1s4KyvefhOYmVNc9RvK6sI4x4SGI8Tk=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.2 h1:BEJKSvyW+dMTW/yzEKWjs0tGUZnMkFPYX4eypyoG0IY=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.2/go.mod h1:I3EG+Tg/gcVSUJ+IJDuvVKFISnRu8oQtMXqltE1rzT8=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.3 h1:BYdXlS/dLNegB+kT+qKbDgsXv/NhSrigMpomLNl9N5Q=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.3/go.mod h1:I3EG+Tg/gcVSUJ+IJDuvVKFISnRu8oQtMXqltE1rzT8=
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.3 h1:FRpec371q4CnRBol0E4utB0BHZLVu146JtCAhau9ujQ=
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.3/go.mod h1:eAmCHilZWXL0skB9Frnm2kZTeY81sF6xKOmePoWKTNE=
github.com/edgexfoundry/go-mod-registry/v4 v4.0.0-dev.2 h1:iHu8JPpmrEOrIZdv0iYW69FlMmkyal/FpbXtC3pHt2c=
Expand Down
50 changes: 32 additions & 18 deletions internal/autoevent/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package autoevent
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand All @@ -24,17 +25,20 @@ import (

"github.com/edgexfoundry/device-sdk-go/v4/internal/application"
sdkCommon "github.com/edgexfoundry/device-sdk-go/v4/internal/common"

"github.com/spf13/cast"
)

type Executor struct {
deviceName string
sourceName string
onChange bool
lastReadings map[string]interface{}
duration time.Duration
stop bool
mutex *sync.Mutex
pool *ants.Pool
deviceName string
sourceName string
onChange bool
onChangeThreshold float64
lastReadings map[string]interface{}
duration time.Duration
stop bool
mutex *sync.Mutex
pool *ants.Pool
}

// Run triggers this Executor executes the handler for the event source periodically
Expand All @@ -61,7 +65,7 @@ func (e *Executor) Run(ctx context.Context, wg *sync.WaitGroup, buffer chan bool
if evt != nil {
if e.onChange {
if e.compareReadings(evt.Readings) {
lc.Debugf("AutoEvent - readings are the same as previous one")
lc.Debugf("AutoEvent - source '%s' readings are the same as previous one", e.sourceName)
continue
}
}
Expand Down Expand Up @@ -109,13 +113,22 @@ func (e *Executor) compareReadings(readings []dtos.BaseReading) bool {
var result = true
for _, reading := range readings {
if lastReading, ok := e.lastReadings[reading.ResourceName]; ok {
if reading.ValueType == common.ValueTypeBinary {
switch reading.ValueType {
case common.ValueTypeBinary:
checksum := xxhash.Checksum64(reading.BinaryValue)
if lastReading != checksum {
e.lastReadings[reading.ResourceName] = checksum
result = false
}
} else {
case common.ValueTypeUint8, common.ValueTypeUint16, common.ValueTypeUint32, common.ValueTypeUint64,
common.ValueTypeInt8, common.ValueTypeInt16, common.ValueTypeInt32, common.ValueTypeInt64,
common.ValueTypeFloat32, common.ValueTypeFloat64:
t := cast.ToFloat64(lastReading) - cast.ToFloat64(reading.Value)
if math.Abs(t) > e.onChangeThreshold {
e.lastReadings[reading.ResourceName] = reading.Value
result = false
}
default:
if lastReading != reading.Value {
e.lastReadings[reading.ResourceName] = reading.Value
result = false
Expand Down Expand Up @@ -155,12 +168,13 @@ func NewExecutor(deviceName string, ae models.AutoEvent, pool *ants.Pool) (*Exec
}

return &Executor{
deviceName: deviceName,
sourceName: ae.SourceName,
onChange: ae.OnChange,
duration: duration,
stop: false,
mutex: &sync.Mutex{},
pool: pool,
deviceName: deviceName,
sourceName: ae.SourceName,
onChange: ae.OnChange,
onChangeThreshold: ae.OnChangeThreshold,
duration: duration,
stop: false,
mutex: &sync.Mutex{},
pool: pool,
}, nil
}
56 changes: 56 additions & 0 deletions internal/autoevent/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,59 @@ func TestCompareReadings(t *testing.T) {
})
}
}

func TestOnChangeThreshold(t *testing.T) {
deviceName := "testDevice"
resourceName := "testResource"
profileName := "testProfile"
autoEvent := models.AutoEvent{SourceName: resourceName, OnChange: true, Interval: "500ms"}
pool, err := ants.NewPool(runtime.GOMAXPROCS(0), ants.WithNonblocking(true))
require.NoError(t, err)
e, err := NewExecutor(deviceName, autoEvent, pool)
require.NoError(t, err)

tests := []struct {
name string
valueType string
lastReadingValue any
currentReadingValue any
onChangeThreshold float64
expectUnchanged bool
}{
{"float32 unchanged is true", common.ValueTypeFloat32, float32(0), float32(0.01), 0.01, true},
{"float32 unchanged is false", common.ValueTypeFloat32, float32(0), float32(0.02), 0.01, false},
{"float64 unchanged is true", common.ValueTypeFloat64, float64(0), float64(0.01), 0.01, true},
{"float64 unchanged is false", common.ValueTypeFloat64, float64(0), float64(0.02), 0.01, false},
{"uint8 unchanged is true", common.ValueTypeUint8, uint8(0), uint8(1), 1, true},
{"uint8 unchanged is false", common.ValueTypeUint8, uint8(0), uint8(2), 1, false},
{"uint16 unchanged is true", common.ValueTypeUint16, uint16(0), uint16(1), 1, true},
{"uint16 unchanged is false", common.ValueTypeUint16, uint16(0), uint16(2), 1, false},
{"uint32 unchanged is true", common.ValueTypeUint32, uint32(0), uint32(1), 1, true},
{"uint32 unchanged is false", common.ValueTypeUint32, uint32(0), uint32(2), 1, false},
{"uint64 unchanged is true", common.ValueTypeUint64, uint64(0), uint64(1), 1, true},
{"uint64 unchanged is false", common.ValueTypeUint64, uint64(0), uint64(2), 1, false},
{"int8 unchanged is true", common.ValueTypeInt8, int8(0), int8(1), 1, true},
{"int8 unchanged is false", common.ValueTypeInt8, int8(0), int8(2), 1, false},
{"int16 unchanged is true", common.ValueTypeInt16, int16(0), int16(1), 1, true},
{"int16 unchanged is false", common.ValueTypeInt16, int16(0), int16(2), 1, false},
{"int32 unchanged is true", common.ValueTypeInt32, int32(0), int32(1), 1, true},
{"int32 unchanged is false", common.ValueTypeInt32, int32(0), int32(2), 1, false},
{"int64 unchanged is true", common.ValueTypeInt64, int64(0), int64(1), 1, true},
{"int64 unchanged is false", common.ValueTypeInt64, int64(0), int64(2), 1, false},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
lastReading, err := dtos.NewSimpleReading(profileName, deviceName, resourceName, testCase.valueType, testCase.lastReadingValue)
require.NoError(t, err)
currentReading, err := dtos.NewSimpleReading(profileName, deviceName, resourceName, testCase.valueType, testCase.currentReadingValue)
require.NoError(t, err)
e.lastReadings = map[string]any{lastReading.ResourceName: lastReading.Value}
e.onChangeThreshold = testCase.onChangeThreshold

res := e.compareReadings([]dtos.BaseReading{currentReading})

assert.Equal(t, testCase.expectUnchanged, res, "compareReading result not as expected")
})
}
}

0 comments on commit 44a1270

Please sign in to comment.