Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes synchronicity between transactions/updates #254

Merged
merged 2 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 20 additions & 45 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,6 @@ type TableCache struct {
cache map[string]*RowCache
eventProcessor *eventProcessor
dbModel *model.DatabaseModel
updates chan ovsdb.TableUpdates
updates2 chan ovsdb.TableUpdates2
errorChan chan error
ovsdb.NotificationHandler
mutex sync.RWMutex
Expand Down Expand Up @@ -472,8 +470,6 @@ func NewTableCache(dbModel *model.DatabaseModel, data Data, logger *logr.Logger)
eventProcessor: eventProcessor,
dbModel: dbModel,
mutex: sync.RWMutex{},
updates: make(chan ovsdb.TableUpdates, bufferSize),
updates2: make(chan ovsdb.TableUpdates2, bufferSize),
errorChan: make(chan error),
logger: logger,
}, nil
Expand Down Expand Up @@ -513,21 +509,31 @@ func (t *TableCache) Tables() []string {
// Update implements the update method of the NotificationHandler interface
// this populates a channel with updates so they can be processed after the initial
// state has been Populated
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) error {
if len(tableUpdates) == 0 {
return
return nil
}
if err := t.Populate(tableUpdates); err != nil {
t.logger.Error(err, "during libovsdb cache populate")
t.errorChan <- NewErrCacheInconsistent(err.Error())
return err
}
t.updates <- tableUpdates
return nil
}

// Update2 implements the update method of the NotificationHandler interface
// this populates a channel with updates so they can be processed after the initial
// state has been Populated
func (t *TableCache) Update2(context interface{}, tableUpdates ovsdb.TableUpdates2) {
func (t *TableCache) Update2(context interface{}, tableUpdates ovsdb.TableUpdates2) error {
if len(tableUpdates) == 0 {
return
return nil
}
if err := t.Populate2(tableUpdates); err != nil {
t.logger.Error(err, "during libovsdb cache populate2")
t.errorChan <- NewErrCacheInconsistent(err.Error())
return err
}
t.updates2 <- tableUpdates
return nil
}

// Locked implements the locked method of the NotificationHandler interface
Expand Down Expand Up @@ -634,12 +640,12 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) error {
case row.Modify != nil:
existing := tCache.Row(uuid)
if existing == nil {
panic(fmt.Errorf("row with uuid %s does not exist", uuid))
return fmt.Errorf("row with uuid %s does not exist", uuid)
}
modified := tCache.Row(uuid)
err := t.ApplyModifications(table, modified, *row.Modify)
if err != nil {
return err
return fmt.Errorf("unable to apply row modifications: %v", err)
}
if !reflect.DeepEqual(modified, existing) {
t.logger.V(5).Info("updating row", "uuid", uuid, "old", fmt.Sprintf("%+v", existing), "new", fmt.Sprintf("%+v", modified))
Expand All @@ -655,7 +661,7 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) error {
// no value on the wire), then process a delete
m := tCache.Row(uuid)
if m == nil {
panic(fmt.Errorf("row with uuid %s does not exist", uuid))
return fmt.Errorf("row with uuid %s does not exist", uuid)
}
t.logger.V(5).Info("deleting row", "uuid", uuid, "model", fmt.Sprintf("%+v", m))
if err := tCache.Delete(uuid); err != nil {
Expand Down Expand Up @@ -691,46 +697,15 @@ func (t *TableCache) AddEventHandler(handler EventHandler) {
func (t *TableCache) Run(stopCh <-chan struct{}) {
wg := sync.WaitGroup{}
wg.Add(1)
go t.processUpdates(stopCh)
wg.Add(1)
go t.eventProcessor.Run(stopCh)
wg.Wait()
t.updates = make(chan ovsdb.TableUpdates, bufferSize)
t.updates2 = make(chan ovsdb.TableUpdates2, bufferSize)
}

// Errors returns a channel where errors that occur during cache propagation can be received
func (t *TableCache) Errors() <-chan error {
return t.errorChan
}

func (t *TableCache) processUpdates(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case update := <-t.updates:
if err := t.Populate(update); err != nil {
select {
case t.errorChan <- err:
// error sent to client
default:
// client not listening for errors
}
}
case update2 := <-t.updates2:
if err := t.Populate2(update2); err != nil {
select {
case t.errorChan <- err:
// error sent to client
default:
// client not listening for errors
}
}
}
}
}

// newRowCache creates a new row cache with the provided data
// if the data is nil, and empty RowCache will be created
func newRowCache(name string, dbModel *model.DatabaseModel, dataType reflect.Type) *RowCache {
Expand Down Expand Up @@ -955,7 +930,7 @@ func (t *TableCache) ApplyModifications(tableName string, base model.Model, upda
}
// With a pointer type, an update value could be a set with 2 elements [old, new]
if nv.Len() != 2 {
panic("expected a slice with 2 elements")
return fmt.Errorf("expected a slice with 2 elements for update: %+v", update)
}
// the new value is the value in the slice which isn't equal to the existing string
for i := 0; i < nv.Len(); i++ {
Expand Down
120 changes: 100 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type Client interface {
API
}

type bufferedUpdate struct {
trozet marked this conversation as resolved.
Show resolved Hide resolved
updates *ovsdb.TableUpdates
updates2 *ovsdb.TableUpdates2
lastTxnID string
}

// ovsdbClient is an OVSDB client
type ovsdbClient struct {
options *options
Expand Down Expand Up @@ -94,6 +100,10 @@ type database struct {
// any ongoing monitors, so we can re-create them if we disconnect
monitors map[string]*Monitor
monitorsMutex sync.Mutex

// tracks any outstanding updates while waiting for a monitor response
deferUpdates bool
deferredUpdates []*bufferedUpdate
}

// NewOVSDBClient creates a new OVSDB Client with the provided
Expand All @@ -110,8 +120,10 @@ func newOVSDBClient(clientDBModel *model.ClientDBModel, opts ...Option) (*ovsdbC
primaryDBName: clientDBModel.Name(),
databases: map[string]*database{
clientDBModel.Name(): {
model: model.NewPartialDatabaseModel(clientDBModel),
monitors: make(map[string]*Monitor),
model: model.NewPartialDatabaseModel(clientDBModel),
monitors: make(map[string]*Monitor),
deferUpdates: true,
deferredUpdates: make([]*bufferedUpdate, 0),
},
},
disconnect: make(chan struct{}),
Expand Down Expand Up @@ -479,6 +491,7 @@ func (o *ovsdbClient) echo(args []interface{}, reply *[]interface{}) error {
// - table-updates: map of table name to table-update. Table-update is a map of uuid to (old, new) row paris
func (o *ovsdbClient) update(params []json.RawMessage, reply *[]interface{}) error {
cookie := MonitorCookie{}
*reply = []interface{}{}
if len(params) > 2 {
return fmt.Errorf("update requires exactly 2 args")
}
Expand All @@ -499,17 +512,27 @@ func (o *ovsdbClient) update(params []json.RawMessage, reply *[]interface{}) err
for tableName := range updates {
o.metrics.numTableUpdates.WithLabelValues(cookie.DatabaseName, tableName).Inc()
}

db.cacheMutex.Lock()
if db.deferUpdates {
trozet marked this conversation as resolved.
Show resolved Hide resolved
db.deferredUpdates = append(db.deferredUpdates, &bufferedUpdate{&updates, nil, ""})
db.cacheMutex.Unlock()
return nil
}
db.cacheMutex.Unlock()

// Update the local DB cache with the tableUpdates
db.cacheMutex.RLock()
db.cache.Update(cookie.ID, updates)
err = db.cache.Update(cookie.ID, updates)
db.cacheMutex.RUnlock()
*reply = []interface{}{}
return nil

return err
}

// update2 handling from ovsdb-server.7
func (o *ovsdbClient) update2(params []json.RawMessage, reply *[]interface{}) error {
cookie := MonitorCookie{}
*reply = []interface{}{}
if len(params) > 2 {
return fmt.Errorf("update2 requires exactly 2 args")
}
Expand All @@ -526,17 +549,27 @@ func (o *ovsdbClient) update2(params []json.RawMessage, reply *[]interface{}) er
if db == nil {
return fmt.Errorf("update: invalid database name: %s unknown", cookie.DatabaseName)
}

db.cacheMutex.Lock()
if db.deferUpdates {
trozet marked this conversation as resolved.
Show resolved Hide resolved
db.deferredUpdates = append(db.deferredUpdates, &bufferedUpdate{nil, &updates, ""})
db.cacheMutex.Unlock()
return nil
}
db.cacheMutex.Unlock()

// Update the local DB cache with the tableUpdates
db.cacheMutex.RLock()
db.cache.Update2(cookie, updates)
err = db.cache.Update2(cookie, updates)
db.cacheMutex.RUnlock()
*reply = []interface{}{}
return nil

return err
}

// update3 handling from ovsdb-server.7
func (o *ovsdbClient) update3(params []json.RawMessage, reply *[]interface{}) error {
cookie := MonitorCookie{}
*reply = []interface{}{}
if len(params) > 3 {
return fmt.Errorf("update requires exactly 3 args")
}
Expand All @@ -559,17 +592,28 @@ func (o *ovsdbClient) update3(params []json.RawMessage, reply *[]interface{}) er
if db == nil {
return fmt.Errorf("update: invalid database name: %s unknown", cookie.DatabaseName)
}
db.monitorsMutex.Lock()
mon := db.monitors[cookie.ID]
mon.LastTransactionID = lastTransactionID
db.monitorsMutex.Unlock()

db.cacheMutex.Lock()
if db.deferUpdates {
db.deferredUpdates = append(db.deferredUpdates, &bufferedUpdate{nil, &updates, lastTransactionID})
db.cacheMutex.Unlock()
return nil
trozet marked this conversation as resolved.
Show resolved Hide resolved
}
db.cacheMutex.Unlock()

// Update the local DB cache with the tableUpdates
db.cacheMutex.RLock()
db.cache.Update2(cookie, updates)
err = db.cache.Update2(cookie, updates)
db.cacheMutex.RUnlock()
*reply = []interface{}{}
return nil

if err == nil {
db.monitorsMutex.Lock()
mon := db.monitors[cookie.ID]
mon.LastTransactionID = lastTransactionID
db.monitorsMutex.Unlock()
}

return err
}

// getSchema returns the schema in use for the provided database name
Expand Down Expand Up @@ -684,6 +728,7 @@ func (o *ovsdbClient) Monitor(ctx context.Context, monitor *Monitor) (MonitorCoo
return cookie, o.monitor(ctx, cookie, false, monitor)
}

//gocyclo:ignore
func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconnecting bool, monitor *Monitor) error {
if len(monitor.Tables) == 0 {
return fmt.Errorf("at least one table should be monitored")
Expand Down Expand Up @@ -792,17 +837,41 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
o.metrics.numMonitors.Inc()
}

db.cacheMutex.Lock()
defer db.cacheMutex.Unlock()
if monitor.Method == ovsdb.MonitorRPC {
u := tableUpdates.(ovsdb.TableUpdates)
db.cacheMutex.Lock()
defer db.cacheMutex.Unlock()
err = db.cache.Populate(u)
} else {
u := tableUpdates.(ovsdb.TableUpdates2)
db.cacheMutex.Lock()
defer db.cacheMutex.Unlock()
err = db.cache.Populate2(u)
}

if err != nil {
return err
}

// populate any deferred updates
db.deferUpdates = false
trozet marked this conversation as resolved.
Show resolved Hide resolved
for _, update := range db.deferredUpdates {
if update.updates != nil {
if err = db.cache.Populate(*update.updates); err != nil {
return err
}
}

if update.updates2 != nil {
if err = db.cache.Populate2(*update.updates2); err != nil {
return err
}
}
if len(update.lastTxnID) > 0 {
db.monitorsMutex.Lock()
db.monitors[cookie.ID].LastTransactionID = update.lastTxnID
db.monitorsMutex.Unlock()
}
}

return err
}

Expand Down Expand Up @@ -879,6 +948,15 @@ func (o *ovsdbClient) handleCacheErrors(stopCh <-chan struct{}, errorChan <-chan
// trigger a reconnect, which will purge the cache
// hopefully a rebuild will fix any inconsistency
o.options.logger.V(3).Error(err, "triggering reconnect to rebuild cache")
// for rebuilding cache with mon_cond_since (not yet fully supported in libovsdb) we
// need to reset the last txn ID
for _, db := range o.databases {
db.monitorsMutex.Lock()
for _, mon := range db.monitors {
mon.LastTransactionID = emptyUUID
}
db.monitorsMutex.Unlock()
}
o.Disconnect()
} else {
o.options.logger.V(3).Error(err, "error updating cache")
Expand All @@ -889,7 +967,7 @@ func (o *ovsdbClient) handleCacheErrors(stopCh <-chan struct{}, errorChan <-chan

func (o *ovsdbClient) handleDisconnectNotification() {
<-o.rpcClient.DisconnectNotify()
// close the stopCh, which will stop the cache event processor and update processing
// close the stopCh, which will stop the cache event processor
close(o.stopCh)
o.metrics.numDisconnects.Inc()
o.rpcMutex.Lock()
Expand Down Expand Up @@ -924,6 +1002,8 @@ func (o *ovsdbClient) handleDisconnectNotification() {
db.cacheMutex.Lock()
defer db.cacheMutex.Unlock()
db.cache = nil
// need to defer updates if/when we reconnect
db.deferUpdates = true

db.schemaMutex.Lock()
defer db.schemaMutex.Unlock()
Expand Down