Skip to content

Commit

Permalink
shadow: reconstruct Desired from Delta (#238)
Browse files Browse the repository at this point in the history
* Reconstruct Desired state from Delta
* Add MaybeIncomplete flag
  • Loading branch information
at-wat authored Mar 22, 2021
1 parent 825c0ae commit e04d2b6
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 35 deletions.
174 changes: 147 additions & 27 deletions shadow/shadow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -436,39 +437,158 @@ func TestOnDelta(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s.(*shadow).doc.Version = 9
cli.Handle(s)

expected := NestedState{
"key": "value",
testSeq := []struct {
expectedDelta NestedState
req *thingDelta
expectedDoc *ThingDocument
}{
{
expectedDelta: NestedState{
"key": "value",
},
req: &thingDelta{
State: NestedState{
"key": "value",
},
Metadata: NestedMetadata{
"key": Metadata{Timestamp: 1},
},
Version: 10,
Timestamp: 1,
},
expectedDoc: &ThingDocument{
State: ThingState{
Desired: NestedState{"key": "value"},
Delta: NestedState{"key": "value"},
Reported: NestedState{},
},
Metadata: ThingStateMetadata{
Desired: NestedMetadata{"key": Metadata{Timestamp: 1}},
Delta: NestedMetadata{"key": Metadata{Timestamp: 1}},
Reported: NestedMetadata{},
},
Version: 10,
Timestamp: 1,
},
},
{
expectedDelta: NestedState{
"key2": "value2",
},
req: &thingDelta{
State: NestedState{
"key2": "value2",
},
Metadata: NestedMetadata{
"key2": Metadata{Timestamp: 2},
},
Version: 11,
Timestamp: 2,
},
expectedDoc: &ThingDocument{
State: ThingState{
Desired: NestedState{
"key": "value",
"key2": "value2",
},
Delta: NestedState{
"key2": "value2",
},
Reported: NestedState{},
},
Metadata: ThingStateMetadata{
Desired: NestedMetadata{
"key": Metadata{Timestamp: 1},
"key2": Metadata{Timestamp: 2},
},
Delta: NestedMetadata{
"key2": Metadata{Timestamp: 2},
},
Reported: NestedMetadata{},
},
Version: 11,
Timestamp: 2,
},
},
{
expectedDelta: NestedState{
"key": "value3",
},
req: &thingDelta{
State: NestedState{
"key": "value3",
},
Metadata: NestedMetadata{
"key": Metadata{Timestamp: 4},
},
Version: 13,
Timestamp: 4,
},
expectedDoc: &ThingDocument{
State: ThingState{
Desired: NestedState{
"key": "value3",
"key2": "value2",
},
Delta: NestedState{
"key": "value3",
},
Reported: NestedState{},
},
Metadata: ThingStateMetadata{
Desired: NestedMetadata{
"key": Metadata{Timestamp: 4},
"key2": Metadata{Timestamp: 2},
},
Delta: NestedMetadata{
"key": Metadata{Timestamp: 4},
},
Reported: NestedMetadata{},
},
Version: 13,
Timestamp: 4,
MaybeIncomplete: true,
},
},
}

done := make(chan struct{})
s.OnDelta(func(delta NestedState) {
if !reflect.DeepEqual(expected, delta) {
t.Fatalf("Expected delta:\n%+v\ngot:\n%+v", expected, delta)
}
close(done)
})
for i, seq := range testSeq {
expectedDelta := seq.expectedDelta
req := seq.req
expectedDoc := seq.expectedDoc

req := &thingDelta{
State: NestedState{
"key": "value",
},
Version: 10,
}
breq, err := json.Marshal(req)
if err != nil {
t.Fatal(err)
}
cli.Serve(&mqtt.Message{
Topic: s.(*shadow).topic("update/delta"),
Payload: breq,
})
t.Run(fmt.Sprintf("Seq%d", i), func(t *testing.T) {
done := make(chan struct{})
s.OnDelta(func(delta NestedState) {
if !reflect.DeepEqual(expectedDelta, delta) {
t.Fatalf("Expected delta:\n%+v\ngot:\n%+v", expectedDelta, delta)
}
close(done)
})

select {
case <-done:
case <-ctx.Done():
t.Fatal("Timeout")
breq, err := json.Marshal(req)
if err != nil {
t.Fatal(err)
}
cli.Serve(&mqtt.Message{
Topic: s.(*shadow).topic("update/delta"),
Payload: breq,
})

select {
case <-done:
case <-ctx.Done():
t.Fatal("Timeout")
}

doc := s.Document()
if !reflect.DeepEqual(expectedDoc, doc) {
t.Errorf("Expected document:\n%+v\ngot:\n%+v", expectedDoc, doc)
}
})
}
}

Expand Down
29 changes: 21 additions & 8 deletions shadow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type ThingDocument struct {
Version int `json:"version,omitempty"`
Timestamp int `json:"timestamp,omitempty"`
ClientToken string `json:"clientToken,omitempty"`

MaybeIncomplete bool `json:"-"`
}

type thingStateRaw struct {
Expand Down Expand Up @@ -128,10 +130,20 @@ func (s *ThingDocument) updateDelta(state *thingDelta) bool {
// Received an old version; just ignore it.
return false
}

if s.Version+1 != state.Version {
// Versions were dropped.
s.MaybeIncomplete = true
}

s.Version = state.Version
s.Timestamp = state.Timestamp
s.State.Delta = state.State
s.Metadata.Delta = state.Metadata

updateState(s.State.Desired, state.State)
updateState(s.Metadata.Desired, state.Metadata)

return true
}

Expand All @@ -156,7 +168,8 @@ func updateStateRaw(state NestedState, update json.RawMessage) error {
if err := json.Unmarshal([]byte(update), &u); err != nil {
return ioterr.New(err, "unmarshaling update")
}
return updateState(state, u)
updateState(state, u)
return nil
}

func updateStateMetadataRaw(state NestedMetadata, update json.RawMessage) error {
Expand All @@ -167,23 +180,23 @@ func updateStateMetadataRaw(state NestedMetadata, update json.RawMessage) error
if err := json.Unmarshal([]byte(update), &u); err != nil {
return ioterr.New(err, "unmarshaling update")
}
return updateState(state, u)
updateState(state, u)
return nil
}

func updateState(state map[string]interface{}, update map[string]interface{}) error {
func updateState(state map[string]interface{}, update map[string]interface{}) {
if len(update) == 0 {
for k := range state {
delete(state, k)
}
return nil
return
}
for key, val := range update {
switch v := val.(type) {
case NestedState:
if s, ok := state[key].(NestedState); ok {
if err := updateState(s, v); err != nil {
return ioterr.New(err, "updating state")
}
updateState(s, v)
return
} else {
state[key] = v
}
Expand All @@ -195,7 +208,7 @@ func updateState(state map[string]interface{}, update map[string]interface{}) er
state[key] = v
}
}
return nil
return
}

func hasUpdate(s json.RawMessage) bool {
Expand Down

0 comments on commit e04d2b6

Please sign in to comment.