Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data): Remove created field from Event and Reading #3299

Merged
merged 2 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.0-dev.20
github.com/edgexfoundry/go-mod-configuration/v2 v2.0.0-dev.4
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0-dev.57
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0-dev.60
github.com/edgexfoundry/go-mod-messaging/v2 v2.0.0-dev.6
github.com/edgexfoundry/go-mod-registry/v2 v2.0.0-dev.3
github.com/edgexfoundry/go-mod-secrets/v2 v2.0.0-dev.10
Expand Down
40 changes: 17 additions & 23 deletions internal/core/data/v2/application/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var persistedEvent = models.Event{
Id: testUUIDString,
DeviceName: testDeviceName,
SourceName: testSourceName,
Created: testCreatedTime,
Origin: testOriginTime,
Readings: buildReadings(),
}
Expand All @@ -53,8 +52,7 @@ func buildReadings() []models.Reading {
r1 := models.SimpleReading{
BaseReading: models.BaseReading{
Id: uuid.New().String(),
Created: ticks,
Origin: testOriginTime,
Origin: ticks,
DeviceName: testDeviceName,
ResourceName: testDeviceResourceName,
ProfileName: "TempProfile",
Expand All @@ -66,8 +64,7 @@ func buildReadings() []models.Reading {
r2 := models.BinaryReading{
BaseReading: models.BaseReading{
Id: uuid.New().String(),
Created: ticks + 20,
Origin: testOriginTime,
Origin: ticks + 20,
DeviceName: testDeviceName,
ResourceName: testDeviceResourceName,
ProfileName: "FileDataProfile",
Expand All @@ -79,8 +76,7 @@ func buildReadings() []models.Reading {
r3 := models.SimpleReading{
BaseReading: models.BaseReading{
Id: uuid.New().String(),
Created: ticks + 30,
Origin: testOriginTime,
Origin: ticks + 30,
DeviceName: testDeviceName,
ResourceName: testDeviceResourceName,
ProfileName: "TempProfile",
Expand All @@ -92,8 +88,7 @@ func buildReadings() []models.Reading {
r4 := models.SimpleReading{
BaseReading: models.BaseReading{
Id: uuid.New().String(),
Created: ticks + 40,
Origin: testOriginTime,
Origin: ticks + 40,
DeviceName: testDeviceName,
ResourceName: testDeviceResourceName,
ProfileName: "TempProfile",
Expand All @@ -105,8 +100,7 @@ func buildReadings() []models.Reading {
r5 := models.SimpleReading{
BaseReading: models.BaseReading{
Id: uuid.New().String(),
Created: ticks + 50,
Origin: testOriginTime,
Origin: ticks + 50,
DeviceName: testDeviceName,
ResourceName: testDeviceResourceName,
ProfileName: "TempProfile",
Expand Down Expand Up @@ -397,20 +391,20 @@ func TestDeleteEventsByDeviceName(t *testing.T) {
func TestEventsByTimeRange(t *testing.T) {
event1 := persistedEvent
event2 := persistedEvent
event2.Created = event2.Created + 20
event2.Origin = event2.Origin + 20
event3 := persistedEvent
event3.Created = event3.Created + 30
event3.Origin = event3.Origin + 30
event4 := persistedEvent
event4.Created = event4.Created + 40
event4.Origin = event4.Origin + 40
event5 := persistedEvent
event5.Created = event5.Created + 50
event5.Origin = event5.Origin + 50

dic := mocks.NewMockDIC()
dbClientMock := &dbMock.DBClient{}
dbClientMock.On("EventsByTimeRange", int(event1.Created), int(event5.Created), 0, 10).Return([]models.Event{event5, event4, event3, event2, event1}, nil)
dbClientMock.On("EventsByTimeRange", int(event2.Created), int(event4.Created), 0, 10).Return([]models.Event{event4, event3, event2}, nil)
dbClientMock.On("EventsByTimeRange", int(event2.Created), int(event4.Created), 1, 2).Return([]models.Event{event3, event2}, nil)
dbClientMock.On("EventsByTimeRange", int(event2.Created), int(event4.Created), 4, 2).Return(nil, errors.NewCommonEdgeX(errors.KindRangeNotSatisfiable, "query objects bounds out of range", nil))
dbClientMock.On("EventsByTimeRange", int(event1.Origin), int(event5.Origin), 0, 10).Return([]models.Event{event5, event4, event3, event2, event1}, nil)
dbClientMock.On("EventsByTimeRange", int(event2.Origin), int(event4.Origin), 0, 10).Return([]models.Event{event4, event3, event2}, nil)
dbClientMock.On("EventsByTimeRange", int(event2.Origin), int(event4.Origin), 1, 2).Return([]models.Event{event3, event2}, nil)
dbClientMock.On("EventsByTimeRange", int(event2.Origin), int(event4.Origin), 4, 2).Return(nil, errors.NewCommonEdgeX(errors.KindRangeNotSatisfiable, "query objects bounds out of range", nil))
dic.Update(di.ServiceConstructorMap{
v2DataContainer.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
Expand All @@ -428,10 +422,10 @@ func TestEventsByTimeRange(t *testing.T) {
expectedCount int
expectedStatusCode int
}{
{"Valid - all events", int(event1.Created), int(event5.Created), 0, 10, false, "", 5, http.StatusOK},
{"Valid - events trimmed by latest and oldest", int(event2.Created), int(event4.Created), 0, 10, false, "", 3, http.StatusOK},
{"Valid - events trimmed by latest and oldest and skipped first", int(event2.Created), int(event4.Created), 1, 2, false, "", 2, http.StatusOK},
{"Invalid - bounds out of range", int(event2.Created), int(event4.Created), 4, 2, true, errors.KindRangeNotSatisfiable, 0, http.StatusRequestedRangeNotSatisfiable},
{"Valid - all events", int(event1.Origin), int(event5.Origin), 0, 10, false, "", 5, http.StatusOK},
{"Valid - events trimmed by latest and oldest", int(event2.Origin), int(event4.Origin), 0, 10, false, "", 3, http.StatusOK},
{"Valid - events trimmed by latest and oldest and skipped first", int(event2.Origin), int(event4.Origin), 1, 2, false, "", 2, http.StatusOK},
{"Invalid - bounds out of range", int(event2.Origin), int(event4.Origin), 4, 2, true, errors.KindRangeNotSatisfiable, 0, http.StatusRequestedRangeNotSatisfiable},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions internal/core/data/v2/application/reading_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func TestReadingsByTimeRange(t *testing.T) {

dic := mocks.NewMockDIC()
dbClientMock := &dbMock.DBClient{}
dbClientMock.On("ReadingsByTimeRange", int(readings[0].GetBaseReading().Created), int(readings[4].GetBaseReading().Created), 0, 10).Return(readings, nil)
dbClientMock.On("ReadingsByTimeRange", int(readings[1].GetBaseReading().Created), int(readings[3].GetBaseReading().Created), 0, 10).Return([]models.Reading{readings[3], readings[2], readings[1]}, nil)
dbClientMock.On("ReadingsByTimeRange", int(readings[1].GetBaseReading().Created), int(readings[3].GetBaseReading().Created), 1, 2).Return([]models.Reading{readings[2], readings[1]}, nil)
dbClientMock.On("ReadingsByTimeRange", int(readings[1].GetBaseReading().Created), int(readings[3].GetBaseReading().Created), 4, 2).Return(nil, errors.NewCommonEdgeX(errors.KindRangeNotSatisfiable, "query objects bounds out of range", nil))
dbClientMock.On("ReadingsByTimeRange", int(readings[0].GetBaseReading().Origin), int(readings[4].GetBaseReading().Origin), 0, 10).Return(readings, nil)
dbClientMock.On("ReadingsByTimeRange", int(readings[1].GetBaseReading().Origin), int(readings[3].GetBaseReading().Origin), 0, 10).Return([]models.Reading{readings[3], readings[2], readings[1]}, nil)
dbClientMock.On("ReadingsByTimeRange", int(readings[1].GetBaseReading().Origin), int(readings[3].GetBaseReading().Origin), 1, 2).Return([]models.Reading{readings[2], readings[1]}, nil)
dbClientMock.On("ReadingsByTimeRange", int(readings[1].GetBaseReading().Origin), int(readings[3].GetBaseReading().Origin), 4, 2).Return(nil, errors.NewCommonEdgeX(errors.KindRangeNotSatisfiable, "query objects bounds out of range", nil))
dic.Update(di.ServiceConstructorMap{
v2DataContainer.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
Expand All @@ -81,10 +81,10 @@ func TestReadingsByTimeRange(t *testing.T) {
expectedCount int
expectedStatusCode int
}{
{"Valid - all readings", int(readings[0].GetBaseReading().Created), int(readings[4].GetBaseReading().Created), 0, 10, false, "", 5, http.StatusOK},
{"Valid - readings trimmed by latest and oldest", int(readings[1].GetBaseReading().Created), int(readings[3].GetBaseReading().Created), 0, 10, false, "", 3, http.StatusOK},
{"Valid - readings trimmed by latest and oldest and skipped first", int(readings[1].GetBaseReading().Created), int(readings[3].GetBaseReading().Created), 1, 2, false, "", 2, http.StatusOK},
{"Invalid - bounds out of range", int(readings[1].GetBaseReading().Created), int(readings[3].GetBaseReading().Created), 4, 2, true, errors.KindRangeNotSatisfiable, 0, http.StatusRequestedRangeNotSatisfiable},
{"Valid - all readings", int(readings[0].GetBaseReading().Origin), int(readings[4].GetBaseReading().Origin), 0, 10, false, "", 5, http.StatusOK},
{"Valid - readings trimmed by latest and oldest", int(readings[1].GetBaseReading().Origin), int(readings[3].GetBaseReading().Origin), 0, 10, false, "", 3, http.StatusOK},
{"Valid - readings trimmed by latest and oldest and skipped first", int(readings[1].GetBaseReading().Origin), int(readings[3].GetBaseReading().Origin), 1, 2, false, "", 2, http.StatusOK},
{"Invalid - bounds out of range", int(readings[1].GetBaseReading().Origin), int(readings[3].GetBaseReading().Origin), 4, 2, true, errors.KindRangeNotSatisfiable, 0, http.StatusRequestedRangeNotSatisfiable},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions internal/core/data/v2/controller/http/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var testAddEvent = requests.AddEventRequest{
var persistedReading = models.SimpleReading{
BaseReading: models.BaseReading{
Id: ExampleUUID,
Created: TestCreatedTime,
Origin: TestOriginTime,
DeviceName: TestDeviceName,
ResourceName: TestDeviceResourceName,
Expand All @@ -81,7 +80,6 @@ var persistedEvent = models.Event{
Id: expectedEventId,
DeviceName: TestDeviceName,
ProfileName: TestDeviceProfileName,
Created: TestCreatedTime,
Origin: TestOriginTime,
Readings: []models.Reading{persistedReading},
}
Expand Down
28 changes: 11 additions & 17 deletions internal/pkg/v2/infrastructure/redis/event.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2020 IOTech Ltd
// Copyright (C) 2020-2021 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -9,19 +9,18 @@ import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/edgexfoundry/edgex-go/internal/pkg/common"
"github.com/edgexfoundry/edgex-go/internal/pkg/v2/utils"
"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v2/v2"
"github.com/edgexfoundry/go-mod-core-contracts/v2/v2/models"

"github.com/gomodule/redigo/redis"
)

const (
EventsCollection = "cd|evt"
EventsCollectionCreated = EventsCollection + DBKeySeparator + v2.Created
EventsCollectionOrigin = EventsCollection + DBKeySeparator + v2.Origin
EventsCollectionDeviceName = EventsCollection + DBKeySeparator + v2.Device + DBKeySeparator + v2.Name
EventsCollectionReadings = EventsCollection + DBKeySeparator + "readings"
)
Expand Down Expand Up @@ -54,7 +53,7 @@ func (c *Client) asyncDeleteEventsByIds(eventIds []string) {
_ = conn.Send(UNLINK, storedKey)
_ = conn.Send(UNLINK, CreateKey(EventsCollectionReadings, e.Id))
_ = conn.Send(ZREM, EventsCollection, storedKey)
_ = conn.Send(ZREM, EventsCollectionCreated, storedKey)
_ = conn.Send(ZREM, EventsCollectionOrigin, storedKey)
_ = conn.Send(ZREM, CreateKey(EventsCollectionDeviceName, e.DeviceName), storedKey)
queriesInQueue++

Expand Down Expand Up @@ -105,9 +104,9 @@ func (c *Client) DeleteEventsByAge(age int64) (edgeXerr errors.EdgeX) {
conn := c.Pool.Get()
defer conn.Close()

expireTimestamp := utils.MakeTimestamp() - age
expireTimestamp := time.Now().UnixNano() - age

eventIds, readingIds, err := getEventReadingIdsByKeyScoreRange(conn, EventsCollectionCreated, "0", strconv.FormatInt(expireTimestamp, 10))
eventIds, readingIds, err := getEventReadingIdsByKeyScoreRange(conn, EventsCollectionOrigin, "0", strconv.FormatInt(expireTimestamp, 10))
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
Expand All @@ -133,15 +132,10 @@ func addEvent(conn redis.Conn, e models.Event) (addedEvent models.Event, edgeXer
}
edgeXerr = nil

if e.Created == 0 {
e.Created = common.MakeTimestamp()
}

event := models.Event{
Id: e.Id,
DeviceName: e.DeviceName,
ProfileName: e.ProfileName,
Created: e.Created,
Origin: e.Origin,
Tags: e.Tags,
}
Expand All @@ -155,9 +149,9 @@ func addEvent(conn redis.Conn, e models.Event) (addedEvent models.Event, edgeXer
_ = conn.Send(MULTI)
// use the SET command to save event as blob
_ = conn.Send(SET, storedKey, m)
_ = conn.Send(ZADD, EventsCollection, e.Created, storedKey)
_ = conn.Send(ZADD, EventsCollectionCreated, e.Created, storedKey)
_ = conn.Send(ZADD, CreateKey(EventsCollectionDeviceName, e.DeviceName), e.Created, storedKey)
_ = conn.Send(ZADD, EventsCollection, e.Origin, storedKey)
_ = conn.Send(ZADD, EventsCollectionOrigin, e.Origin, storedKey)
_ = conn.Send(ZADD, CreateKey(EventsCollectionDeviceName, e.DeviceName), e.Origin, storedKey)

// add reading ids as sorted set under each event id
// sort by the order provided by device service
Expand Down Expand Up @@ -208,7 +202,7 @@ func deleteEventById(conn redis.Conn, id string) (edgeXerr errors.EdgeX) {
_ = conn.Send(UNLINK, storedKey)
_ = conn.Send(UNLINK, CreateKey(EventsCollectionReadings, e.Id))
_ = conn.Send(ZREM, EventsCollection, storedKey)
_ = conn.Send(ZREM, EventsCollectionCreated, storedKey)
_ = conn.Send(ZREM, EventsCollectionOrigin, storedKey)
_ = conn.Send(ZREM, CreateKey(EventsCollectionDeviceName, e.DeviceName), storedKey)

res, err := redis.Values(conn.Do(EXEC))
Expand Down Expand Up @@ -288,7 +282,7 @@ func eventsByDeviceName(conn redis.Conn, offset int, limit int, name string) (ev

// eventsByTimeRange query events by time range, offset, and limit
func eventsByTimeRange(conn redis.Conn, start int, end int, offset int, limit int) (events []models.Event, edgeXerr errors.EdgeX) {
objects, edgeXerr := getObjectsByScoreRange(conn, EventsCollectionCreated, start, end, offset, limit)
objects, edgeXerr := getObjectsByScoreRange(conn, EventsCollectionOrigin, start, end, offset, limit)
if edgeXerr != nil {
return events, edgeXerr
}
Expand Down
21 changes: 9 additions & 12 deletions internal/pkg/v2/infrastructure/redis/reading.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2020 IOTech Ltd
// Copyright (C) 2020-2021 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -20,7 +20,7 @@ import (

const (
ReadingsCollection = "cd|rd"
ReadingsCollectionCreated = ReadingsCollection + DBKeySeparator + v2.Created
ReadingsCollectionOrigin = ReadingsCollection + DBKeySeparator + v2.Origin
ReadingsCollectionDeviceName = ReadingsCollection + DBKeySeparator + v2.Device + DBKeySeparator + v2.Name
ReadingsCollectionResourceName = ReadingsCollection + DBKeySeparator + v2.ResourceName
)
Expand Down Expand Up @@ -55,7 +55,7 @@ func (c *Client) asyncDeleteReadingsByIds(readingIds []string) {
storedKey := readingStoredKey(r.Id)
_ = conn.Send(UNLINK, storedKey)
_ = conn.Send(ZREM, ReadingsCollection, storedKey)
_ = conn.Send(ZREM, ReadingsCollectionCreated, storedKey)
_ = conn.Send(ZREM, ReadingsCollectionOrigin, storedKey)
_ = conn.Send(ZREM, CreateKey(ReadingsCollectionDeviceName, r.DeviceName), storedKey)
_ = conn.Send(ZREM, CreateKey(ReadingsCollectionResourceName, r.ResourceName), storedKey)
queriesInQueue++
Expand Down Expand Up @@ -122,9 +122,9 @@ func addReading(conn redis.Conn, r models.Reading) (reading models.Reading, edge
// use the SET command to save reading as blob
_ = conn.Send(SET, storedKey, m)
_ = conn.Send(ZADD, ReadingsCollection, 0, storedKey)
_ = conn.Send(ZADD, ReadingsCollectionCreated, baseReading.Created, storedKey)
_ = conn.Send(ZADD, CreateKey(ReadingsCollectionDeviceName, baseReading.DeviceName), baseReading.Created, storedKey)
_ = conn.Send(ZADD, CreateKey(ReadingsCollectionResourceName, baseReading.ResourceName), baseReading.Created, storedKey)
_ = conn.Send(ZADD, ReadingsCollectionOrigin, baseReading.Origin, storedKey)
_ = conn.Send(ZADD, CreateKey(ReadingsCollectionDeviceName, baseReading.DeviceName), baseReading.Origin, storedKey)
_ = conn.Send(ZADD, CreateKey(ReadingsCollectionResourceName, baseReading.ResourceName), baseReading.Origin, storedKey)

return reading, nil
}
Expand All @@ -141,7 +141,7 @@ func deleteReadingById(conn redis.Conn, id string) (edgeXerr errors.EdgeX) {
_ = conn.Send(MULTI)
_ = conn.Send(UNLINK, storedKey)
_ = conn.Send(ZREM, ReadingsCollection, storedKey)
_ = conn.Send(ZREM, ReadingsCollectionCreated, storedKey)
_ = conn.Send(ZREM, ReadingsCollectionOrigin, storedKey)
_ = conn.Send(ZREM, CreateKey(ReadingsCollectionDeviceName, r.DeviceName), storedKey)
_ = conn.Send(ZREM, CreateKey(ReadingsCollectionResourceName, r.ResourceName), storedKey)
_, err := conn.Do(EXEC)
Expand All @@ -153,9 +153,6 @@ func deleteReadingById(conn redis.Conn, id string) (edgeXerr errors.EdgeX) {
}

func checkReadingValue(b *models.BaseReading) errors.EdgeX {
if b.Created == 0 {
b.Created = common.MakeTimestamp()
}
// check if id is a valid uuid
if b.Id == "" {
b.Id = uuid.New().String()
Expand Down Expand Up @@ -184,7 +181,7 @@ func allReadings(conn redis.Conn, offset int, limit int) (readings []models.Read
if limit == -1 { //-1 limit means that clients want to retrieve all remaining records after offset from DB, so specifying -1 for end
end = limit
}
objects, err := getObjectsBySomeRange(conn, ZREVRANGE, ReadingsCollectionCreated, offset, end)
objects, err := getObjectsBySomeRange(conn, ZREVRANGE, ReadingsCollectionOrigin, offset, end)
if err != nil {
return readings, errors.NewCommonEdgeXWrapper(err)
}
Expand Down Expand Up @@ -222,7 +219,7 @@ func readingsByDeviceName(conn redis.Conn, offset int, limit int, name string) (

// readingsByTimeRange query readings by time range, offset, and limit
func readingsByTimeRange(conn redis.Conn, start int, end int, offset int, limit int) (readings []models.Reading, edgeXerr errors.EdgeX) {
objects, edgeXerr := getObjectsByScoreRange(conn, ReadingsCollectionCreated, start, end, offset, limit)
objects, edgeXerr := getObjectsByScoreRange(conn, ReadingsCollectionOrigin, start, end, offset, limit)
if edgeXerr != nil {
return readings, edgeXerr
}
Expand Down
Loading