Skip to content

Commit

Permalink
E2E: Add individual server owned datastream tests
Browse files Browse the repository at this point in the history
Signed-off-by: Arnaldo Cesco <arnaldo.cesco@secomind.com>
  • Loading branch information
Annopaolo committed Nov 28, 2022
1 parent bd59647 commit 526b718
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 3 deletions.
125 changes: 122 additions & 3 deletions e2e_tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/stretchr/testify/suite"
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/astarte-platform/astarte-go/client"
"github.com/astarte-platform/astarte-go/interfaces"
Expand Down Expand Up @@ -224,6 +225,41 @@ func (suite *EndToEndSuite) TestPropertiesIndividualDevice() {
}
suite.Empty(res2, "Properties not unset")
}
func (suite *EndToEndSuite) TestDatastreamIndividualServer() {
//push data to Astarte, OnIndividualMessageReceived will take care of comparison with expected values
for k, v := range expectedDatastreamIndividual {
if k == "/test/datetimearray" {

} else if err := suite.astarteAPIClient.AppEngine.SendDatastream(suite.realm, suite.deviceID, client.AstarteDeviceID,
"org.astarte-platform.server.individual.datastream.Everything", k, v); err != nil {
suite.Fail("Error pushing message", err)
}
fmt.Printf("Pushed %v on %s , unless it was on \"/test/datetimearray\"\n", v, k)
}
}

func (suite *EndToEndSuite) checkReceivedDatastreamIndividualServer(d *device.Device, message device.IndividualMessage) {
if message.Interface.Name != "org.astarte-platform.server.individual.datastream.Everything" {
suite.Fail("Received message from unexpected interface", message.Interface.Name)
}
found := false
for expectedPath, expectedValue := range expectedDatastreamIndividual {
if expectedPath == message.Path {
found = true
astarteType := strings.Split(expectedPath, "/")[2]
receivedValue := primitiveValueToAstarteType(message.Value, astarteType)
if !suite.Equal(expectedValue, receivedValue) {
fmt.Printf("Expected: %v : %v ---- received %v : %v\n",
expectedValue, reflect.TypeOf(expectedValue),
receivedValue, reflect.TypeOf(receivedValue))
suite.Fail("Sent value different from received value", expectedValue, receivedValue)
}
}
}
if !found {
suite.Fail("Received message from unexpected path", message.Path)
}
}

// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
Expand Down Expand Up @@ -267,9 +303,8 @@ func (suite *EndToEndSuite) setupDevice() {
d.OnConnectionStateChanged = func(d *device.Device, state bool) {
fmt.Printf("Device connection state: %t\n", state)
}
d.OnIndividualMessageReceived = func(d *device.Device, message device.IndividualMessage) {
fmt.Printf("Individual message received: %v\n", message)
}
d.OnIndividualMessageReceived = suite.checkReceivedDatastreamIndividualServer

d.OnAggregateMessageReceived = func(d *device.Device, message device.AggregateMessage) {
fmt.Printf("Aggregate message received: %v\n", message)
}
Expand Down Expand Up @@ -359,6 +394,81 @@ func individualValueToAstarteType(value interface{}, astarteType string) interfa
}
}

//nolint
func primitiveValueToAstarteType(value interface{}, astarteType string) interface{} {
// cast like there's no tomorrow yolo
switch astarteType {
case "datetime":
return value.(primitive.DateTime).Time().UTC()
case "datetimearray":
n := []time.Time{}
pa, _ := value.(primitive.A)
for _, v := range []interface{}(pa) {
n = append(n, v.(primitive.DateTime).Time().UTC())
}
return n
case "integer":
return int(value.(int32))
case "integerarray":
n := []int{}
pa, _ := value.(primitive.A)
for _, v := range []interface{}(pa) {
n = append(n, int(v.(int32)))
}
return n
case "double":
return value.(float64)
case "doublearray":
n := []float64{}
pa, _ := value.(primitive.A)
for _, v := range []interface{}(pa) {
n = append(n, toFloat64(v))
}
return n
case "longinteger":
return value.(int64)
case "longintegerarray":
n := []int64{}
pa, _ := value.(primitive.A)
for _, v := range []interface{}(pa) {
intV := v.(int64)
n = append(n, intV)
}
return n
case "boolean":
return value.(bool)
case "booleanarray":
n := []bool{}
pa, _ := value.(primitive.A)
for _, v := range []interface{}(pa) {
n = append(n, v.(bool))
}
return n
case "string":
return value.(string)
case "stringarray":
n := []string{}
pa, _ := value.(primitive.A)
for _, v := range []interface{}(pa) {
n = append(n, v.(string))
}
return n
case "binaryblob":
return value.(primitive.Binary).Data
case "binaryblobarray":
n := [][]byte{}
pa, _ := value.(primitive.A)
for _, v := range []interface{}(pa) {
decoded := v.(primitive.Binary).Data
n = append(n, decoded)
}
return n
default:
// can't happen because we checked all astarte types
return nil
}
}

func toDate(value interface{}) time.Time {
date, _ := time.ParseInLocation(time.RFC3339, fmt.Sprintf("%s", value), time.UTC)
return date
Expand All @@ -374,6 +484,15 @@ func toLongInteger(value interface{}) int64 {
return int64(v.(float64))
}
}
func toFloat64(value interface{}) float64 {
switch v := value.(type) {
case int32:
return float64(v)
// Assuming that, if it is not a string, it can be casted to a float64
default:
return v.(float64)
}
}

func loadInterfaces(d *device.Device, interfaceDirectory string) error {
files, _ := ioutil.ReadDir(interfaceDirectory)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{
"interface_name": "org.astarte-platform.server.individual.datastream.Everything",
"version_major": 0,
"version_minor": 1,
"type": "datastream",
"ownership": "server",
"mappings": [
{
"endpoint": "/%{sensor_id}/binaryblob",
"type": "binaryblob",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/binaryblobarray",
"type": "binaryblobarray",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/boolean",
"type": "boolean",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/booleanarray",
"type": "booleanarray",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/datetime",
"type": "datetime",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/datetimearray",
"type": "datetimearray",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/double",
"type": "double",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/doublearray",
"type": "doublearray",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/integer",
"type": "integer",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/integerarray",
"type": "integerarray",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/longinteger",
"type": "longinteger",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/longintegerarray",
"type": "longintegerarray",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/string",
"type": "string",
"explicit_timestamp": true
},
{
"endpoint": "/%{sensor_id}/stringarray",
"type": "stringarray",
"explicit_timestamp": true
}
]
}

0 comments on commit 526b718

Please sign in to comment.