Skip to content

Commit

Permalink
Merge pull request #249 from dave-tucker/update-channels
Browse files Browse the repository at this point in the history
More Robust Error Handling For Cache Issues
  • Loading branch information
dave-tucker authored Oct 14, 2021
2 parents 2270cb7 + 4462560 commit 52b0298
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 39 deletions.
118 changes: 95 additions & 23 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@ const (
columnDelimiter = ","
)

// ErrCacheInconsistent is an error that can occur when an operation
// would cause the cache to be inconsistent
type ErrCacheInconsistent struct {
details string
}

// Error implements the error interface
func (e *ErrCacheInconsistent) Error() string {
msg := "cache inconsistent"
if e.details != "" {
msg += ": " + e.details
}
return msg
}

func NewErrCacheInconsistent(details string) *ErrCacheInconsistent {
return &ErrCacheInconsistent{
details: details,
}
}

// ErrIndexExists is returned when an item in the database cannot be inserted due to existing indexes
type ErrIndexExists struct {
Table string
Expand Down Expand Up @@ -115,7 +136,7 @@ func (r *RowCache) Create(uuid string, m model.Model, checkIndexes bool) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.cache[uuid]; ok {
return fmt.Errorf("row %s already exists", uuid)
return NewErrCacheInconsistent(fmt.Sprintf("cannot create row %s as it already exists", uuid))
}
if reflect.TypeOf(m) != r.dataType {
return fmt.Errorf("expected data of type %s, but got %s", r.dataType.String(), reflect.TypeOf(m).String())
Expand Down Expand Up @@ -153,7 +174,7 @@ func (r *RowCache) Update(uuid string, m model.Model, checkIndexes bool) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.cache[uuid]; !ok {
return fmt.Errorf("row %s does not exist", uuid)
return NewErrCacheInconsistent(fmt.Sprintf("cannot update row %s as it does not exist in the cache", uuid))
}
oldRow := model.Clone(r.cache[uuid])
oldInfo, err := mapper.NewInfo(&r.schema, oldRow)
Expand Down Expand Up @@ -249,7 +270,7 @@ func (r *RowCache) Delete(uuid string) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.cache[uuid]; !ok {
return fmt.Errorf("row %s does not exist", uuid)
return NewErrCacheInconsistent(fmt.Sprintf("cannot delete row %s as it does not exist in the cache", uuid))
}
oldRow := r.cache[uuid]
oldInfo, err := mapper.NewInfo(&r.schema, oldRow)
Expand Down Expand Up @@ -409,6 +430,9 @@ type TableCache struct {
mapper *mapper.Mapper
dbModel *model.DBModel
schema *ovsdb.DatabaseSchema
updates chan ovsdb.TableUpdates
updates2 chan ovsdb.TableUpdates2
errorChan chan error
ovsdb.NotificationHandler
mutex sync.RWMutex
}
Expand Down Expand Up @@ -444,6 +468,9 @@ func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Da
mapper: mapper.NewMapper(schema),
dbModel: dbModel,
mutex: sync.RWMutex{},
updates: make(chan ovsdb.TableUpdates, bufferSize),
updates2: make(chan ovsdb.TableUpdates2, bufferSize),
errorChan: make(chan error),
}, nil
}

Expand Down Expand Up @@ -479,21 +506,23 @@ func (t *TableCache) Tables() []string {
}

// Update implements the update method of the NotificationHandler interface
// this populates the cache with new updates
// this populates a channel with updates so they can be processed after the initial
// state has been Populated
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) {
if len(tableUpdates) == 0 {
return
}
t.Populate(tableUpdates)
t.updates <- tableUpdates
}

// Update2 implements the update method of the NotificationHandler interface
// this populates the cache with new updates
// this populates a channel with updates so they can be processed after the initial
// state has been Populated
func (t *TableCache) Update2(context interface{}, tableUpdates ovsdb.TableUpdates2) {
if len(tableUpdates) == 0 {
return
}
t.Populate2(tableUpdates)
t.updates2 <- tableUpdates
}

// Locked implements the locked method of the NotificationHandler interface
Expand All @@ -513,7 +542,7 @@ func (t *TableCache) Disconnected() {
}

// Populate adds data to the cache and places an event on the channel
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) error {
t.mutex.Lock()
defer t.mutex.Unlock()

Expand All @@ -527,40 +556,41 @@ func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
if row.New != nil {
newModel, err := t.CreateModel(table, row.New, uuid)
if err != nil {
panic(err)
return err
}
if existing := tCache.Row(uuid); existing != nil {
if !reflect.DeepEqual(newModel, existing) {
if err := tCache.Update(uuid, newModel, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(updateEvent, table, existing, newModel)
}
// no diff
continue
}
if err := tCache.Create(uuid, newModel, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(addEvent, table, nil, newModel)
continue
} else {
oldModel, err := t.CreateModel(table, row.Old, uuid)
if err != nil {
panic(err)
return err
}
if err := tCache.Delete(uuid); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(deleteEvent, table, oldModel, nil)
continue
}
}
}
return nil
}

// Populate2 adds data to the cache and places an event on the channel
func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) error {
t.mutex.Lock()
defer t.mutex.Unlock()
for table := range t.dbModel.Types() {
Expand All @@ -574,19 +604,19 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
case row.Initial != nil:
m, err := t.CreateModel(table, row.Initial, uuid)
if err != nil {
panic(err)
return err
}
if err := tCache.Create(uuid, m, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(addEvent, table, nil, m)
case row.Insert != nil:
m, err := t.CreateModel(table, row.Insert, uuid)
if err != nil {
panic(err)
return err
}
if err := tCache.Create(uuid, m, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(addEvent, table, nil, m)
case row.Modify != nil:
Expand All @@ -597,11 +627,11 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
modified := tCache.Row(uuid)
err := t.ApplyModifications(table, modified, *row.Modify)
if err != nil {
panic(err)
return err
}
if !reflect.DeepEqual(modified, existing) {
if err := tCache.Update(uuid, modified, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(updateEvent, table, existing, modified)
}
Expand All @@ -615,12 +645,13 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
panic(fmt.Errorf("row with uuid %s does not exist", uuid))
}
if err := tCache.Delete(uuid); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(deleteEvent, table, m, nil)
}
}
}
return nil
}

// Purge drops all data in the cache and reinitializes it using the
Expand All @@ -639,9 +670,50 @@ func (t *TableCache) AddEventHandler(handler EventHandler) {
t.eventProcessor.AddEventHandler(handler)
}

// Run starts the event processing loop. It blocks until the channel is closed.
// Run starts the event processing and update processing loops.
// It blocks until the stop channel is closed.
// Once closed, it clears the updates/updates2 channels to ensure we don't process stale updates on a new connection
func (t *TableCache) Run(stopCh <-chan struct{}) {
t.eventProcessor.Run(stopCh)
wg := sync.WaitGroup{}
wg.Add(1)
go t.processUpdates(stopCh)
wg.Add(1)
go t.eventProcessor.Run(stopCh)
wg.Wait()
t.updates = make(chan ovsdb.TableUpdates, bufferSize)
t.updates2 = make(chan ovsdb.TableUpdates2, bufferSize)
}

// Errors returns a channel where errors that occur during cache propagation can be received
func (t *TableCache) Errors() <-chan error {
return t.errorChan
}

func (t *TableCache) processUpdates(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case update := <-t.updates:
if err := t.Populate(update); err != nil {
select {
case t.errorChan <- err:
// error sent to client
default:
// client not listening for errors
}
}
case update2 := <-t.updates2:
if err := t.Populate2(update2); err != nil {
select {
case t.errorChan <- err:
// error sent to client
default:
// client not listening for errors
}
}
}
}
}

// newRowCache creates a new row cache with the provided data
Expand Down
30 changes: 20 additions & 10 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,8 @@ func TestTableCache_populate(t *testing.T) {
},
},
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -733,7 +734,8 @@ func TestTableCache_populate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -744,7 +746,8 @@ func TestTableCache_populate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -786,7 +789,8 @@ func TestTableCachePopulate(t *testing.T) {
},
},
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -798,7 +802,8 @@ func TestTableCachePopulate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -809,7 +814,8 @@ func TestTableCachePopulate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -851,7 +857,8 @@ func TestTableCachePopulate2(t *testing.T) {
}

t.Log("Initial")
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)

Expand All @@ -865,7 +872,8 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
got = tc.Table("Open_vSwitch").Row("test2")
assert.Equal(t, testRowModel2, got)

Expand All @@ -879,7 +887,8 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)

Expand All @@ -892,7 +901,8 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
}
Expand Down
Loading

0 comments on commit 52b0298

Please sign in to comment.