From 4922601a112a500aeccc6fb350dfa1f244f6cae8 Mon Sep 17 00:00:00 2001 From: anand-kumar-subramanian <51383315+anand-kumar-subramanian@users.noreply.github.com> Date: Thu, 30 Jul 2020 16:17:59 -0700 Subject: [PATCH] Bulk and RPC API support in translib (#16) Bulk and RPC API support in translib --- translib/acl_app.go | 13 ++ translib/app_interface.go | 2 + translib/common_app.go | 12 ++ translib/db/db.go | 73 ++++++--- translib/intf_app.go | 12 ++ translib/lldp_app.go | 12 ++ translib/pfm_app.go | 13 ++ translib/sys_app.go | 13 ++ translib/translib.go | 302 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 433 insertions(+), 19 deletions(-) diff --git a/translib/acl_app.go b/translib/acl_app.go index 628d4ea270dc..ba86b5e471db 100644 --- a/translib/acl_app.go +++ b/translib/acl_app.go @@ -21,6 +21,7 @@ package translib import ( "bytes" + "errors" "fmt" "reflect" "strconv" @@ -166,6 +167,11 @@ func (app *AclApp) translateGet(dbs [db.MaxDB]*db.DB) error { return err } +func (app *AclApp) translateAction(dbs [db.MaxDB]*db.DB) error { + err := errors.New("Not supported") + return err +} + func (app *AclApp) translateSubscribe(dbs [db.MaxDB]*db.DB, path string) (*notificationOpts, *notificationInfo, error) { pathInfo := NewPathInfo(path) notifInfo := notificationInfo{dbno: db.ConfigDB} @@ -288,6 +294,13 @@ func (app *AclApp) processGet(dbs [db.MaxDB]*db.DB) (GetResponse, error) { return GetResponse{Payload: payload}, err } +func (app *AclApp) processAction(dbs [db.MaxDB]*db.DB) (ActionResponse, error) { + var resp ActionResponse + err := errors.New("Not implemented") + + return resp, err +} + func (app *AclApp) translateCRUCommon(d *db.DB, opcode int) ([]db.WatchKeys, error) { var err error var keys []db.WatchKeys diff --git a/translib/app_interface.go b/translib/app_interface.go index 998a2cf1e03f..bb56988a0fdc 100644 --- a/translib/app_interface.go +++ b/translib/app_interface.go @@ -79,12 +79,14 @@ type appInterface interface { translateReplace(d *db.DB) ([]db.WatchKeys, error) translateDelete(d *db.DB) ([]db.WatchKeys, error) translateGet(dbs [db.MaxDB]*db.DB) error + translateAction(dbs [db.MaxDB]*db.DB) error translateSubscribe(dbs [db.MaxDB]*db.DB, path string) (*notificationOpts, *notificationInfo, error) processCreate(d *db.DB) (SetResponse, error) processUpdate(d *db.DB) (SetResponse, error) processReplace(d *db.DB) (SetResponse, error) processDelete(d *db.DB) (SetResponse, error) processGet(dbs [db.MaxDB]*db.DB) (GetResponse, error) + processAction(dbs [db.MaxDB]*db.DB) (ActionResponse, error) } //App modules will use this function to register with App interface during boot up diff --git a/translib/common_app.go b/translib/common_app.go index ffaa250084e6..6f57ddabfa0a 100644 --- a/translib/common_app.go +++ b/translib/common_app.go @@ -112,6 +112,11 @@ func (app *CommonApp) translateGet(dbs [db.MaxDB]*db.DB) error { return err } +func (app *CommonApp) translateAction(dbs [db.MaxDB]*db.DB) error { + err := errors.New("Not supported") + return err +} + func (app *CommonApp) translateSubscribe(dbs [db.MaxDB]*db.DB, path string) (*notificationOpts, *notificationInfo, error) { err := errors.New("Not supported") notifInfo := notificationInfo{dbno: db.ConfigDB} @@ -205,6 +210,13 @@ func (app *CommonApp) processGet(dbs [db.MaxDB]*db.DB) (GetResponse, error) { return GetResponse{Payload: resPayload}, err } +func (app *CommonApp) processAction(dbs [db.MaxDB]*db.DB) (ActionResponse, error) { + var resp ActionResponse + err := errors.New("Not implemented") + + return resp, err +} + func (app *CommonApp) translateCRUDCommon(d *db.DB, opcode int) ([]db.WatchKeys, error) { var err error var keys []db.WatchKeys diff --git a/translib/db/db.go b/translib/db/db.go index 0469639fd86a..4b5cdc394f6a 100644 --- a/translib/db/db.go +++ b/translib/db/db.go @@ -147,8 +147,9 @@ func(dbNo DBNum) String() string { type Options struct { DBNo DBNum InitIndicator string - TableNameSeparator string - KeySeparator string + TableNameSeparator string //Overriden by the DB config file's separator. + KeySeparator string //Overriden by the DB config file's separator. + IsWriteDisabled bool //Indicated if write is allowed DisableCVLCheck bool } @@ -580,18 +581,20 @@ func (d *DB) doWrite(ts * TableSpec, op _txOp, key Key, val interface{}) error { var e error = nil var value Value + if d.Opts.IsWriteDisabled { + glog.Error("doWrite: Write to DB disabled") + e = errors.New("Write to DB disabled during this operation") + goto doWriteExit + } + switch d.txState { case txStateNone: - if glog.V(2) { - glog.Info("doWrite: No Transaction.") - } - break + glog.Info("doWrite: No Transaction.") case txStateWatch: if glog.V(2) { glog.Info("doWrite: Change to txStateSet, txState: ", d.txState) } d.txState = txStateSet - break case txStateSet: if glog.V(5) { glog.Info("doWrite: Remain in txStateSet, txState: ", d.txState) @@ -653,7 +656,7 @@ func (d *DB) doWrite(ts * TableSpec, op _txOp, key Key, val interface{}) error { // Transaction case. - glog.Info("doWrite: op: ", op, " ", key, " : ", value) + glog.Info("doWrite: op: ", op, " ", d.key2redis(ts, key), " : ", value) switch op { case txOpHMSet, txOpHDel: @@ -1099,7 +1102,6 @@ func (d *DB) StartTx(w []WatchKeys, tss []*TableSpec) error { } var e error = nil - var args []interface{} var ret cvl.CVLRetCode //Start CVL session @@ -1115,6 +1117,44 @@ func (d *DB) StartTx(w []WatchKeys, tss []*TableSpec) error { goto StartTxExit } + e = d.performWatch(w, tss) + +StartTxExit: + + if glog.V(3) { + glog.Info("StartTx: End: e: ", e) + } + return e +} + +func (d *DB) AppendWatchTx(w []WatchKeys, tss []*TableSpec) error { + if glog.V(3) { + glog.Info("AppendWatchTx: Begin: w: ", w, " tss: ", tss) + } + + var e error = nil + + // Validate State + if d.txState == txStateNone { + glog.Error("AppendWatchTx: Incorrect State, txState: ", d.txState) + e = errors.New("Transaction has not started") + goto AppendWatchTxExit + } + + e = d.performWatch(w, tss) + +AppendWatchTxExit: + + if glog.V(3) { + glog.Info("AppendWatchTx: End: e: ", e) + } + return e +} + +func (d *DB) performWatch(w []WatchKeys, tss []*TableSpec) error { + var e error + var args []interface{} + // For each watchkey // If a pattern, Get the keys, appending results to Cmd args. // Else append keys to the Cmd args @@ -1133,7 +1173,7 @@ func (d *DB) StartTx(w []WatchKeys, tss []*TableSpec) error { redisKeys, e := d.client.Keys(redisKey).Result() if e != nil { - glog.Warning("StartTx: Keys: " + e.Error()) + glog.Warning("performWatch: Keys: " + e.Error()) continue } for j := 0; j < len(redisKeys); j++ { @@ -1148,27 +1188,22 @@ func (d *DB) StartTx(w []WatchKeys, tss []*TableSpec) error { } if len(args) == 1 { - glog.Warning("StartTx: Empty WatchKeys. Skipping WATCH") - goto StartTxSkipWatch + glog.Warning("performWatch: Empty WatchKeys. Skipping WATCH") + goto SkipWatch } // Issue the WATCH _, e = d.client.Do(args...).Result() if e != nil { - glog.Warning("StartTx: Do: WATCH ", args, " e: ", e.Error()) + glog.Warning("performWatch: Do: WATCH ", args, " e: ", e.Error()) } -StartTxSkipWatch: +SkipWatch: // Switch State d.txState = txStateWatch -StartTxExit: - - if glog.V(3) { - glog.Info("StartTx: End: e: ", e) - } return e } diff --git a/translib/intf_app.go b/translib/intf_app.go index 8e417bb88b87..b85fe95047d1 100644 --- a/translib/intf_app.go +++ b/translib/intf_app.go @@ -245,6 +245,11 @@ func (app *IntfApp) translateGet(dbs [db.MaxDB]*db.DB) error { return err } +func (app *IntfApp) translateAction(dbs [db.MaxDB]*db.DB) error { + err := errors.New("Not supported") + return err +} + func (app *IntfApp) translateSubscribe(dbs [db.MaxDB]*db.DB, path string) (*notificationOpts, *notificationInfo, error) { app.appDB = dbs[db.ApplDB] pathInfo := NewPathInfo(path) @@ -470,6 +475,13 @@ func (app *IntfApp) processGet(dbs [db.MaxDB]*db.DB) (GetResponse, error) { return GetResponse{Payload: payload}, err } +func (app *IntfApp) processAction(dbs [db.MaxDB]*db.DB) (ActionResponse, error) { + var resp ActionResponse + err := errors.New("Not implemented") + + return resp, err +} + /* Checking IP adderss is v4 */ func validIPv4(ipAddress string) bool { ipAddress = strings.Trim(ipAddress, " ") diff --git a/translib/lldp_app.go b/translib/lldp_app.go index 29af42cf810e..08a4109c7630 100644 --- a/translib/lldp_app.go +++ b/translib/lldp_app.go @@ -132,6 +132,11 @@ func (app *lldpApp) translateGet(dbs [db.MaxDB]*db.DB) error { return err } +func (app *lldpApp) translateAction(dbs [db.MaxDB]*db.DB) error { + err := errors.New("Not supported") + return err +} + func (app *lldpApp) translateSubscribe(dbs [db.MaxDB]*db.DB, path string) (*notificationOpts, *notificationInfo, error) { pathInfo := NewPathInfo(path) notifInfo := notificationInfo{dbno: db.ApplDB} @@ -254,6 +259,13 @@ func (app *lldpApp) processGet(dbs [db.MaxDB]*db.DB) (GetResponse, error) { return GetResponse{Payload:payload}, err } +func (app *lldpApp) processAction(dbs [db.MaxDB]*db.DB) (ActionResponse, error) { + var resp ActionResponse + err := errors.New("Not implemented") + + return resp, err +} + /** Helper function to populate JSON response for GET request **/ func (app *lldpApp) getLldpNeighInfoFromInternalMap(ifName *string, ifInfo *ocbinds.OpenconfigLldp_Lldp_Interfaces_Interface) { diff --git a/translib/pfm_app.go b/translib/pfm_app.go index f823d6c4ac64..18a28a81cce0 100644 --- a/translib/pfm_app.go +++ b/translib/pfm_app.go @@ -72,6 +72,11 @@ func (app *PlatformApp) getAppRootObject() (*ocbinds.OpenconfigPlatform_Componen return deviceObj.Components } +func (app *PlatformApp) translateAction(dbs [db.MaxDB]*db.DB) error { + err := errors.New("Not supported") + return err +} + func (app *PlatformApp) translateSubscribe(dbs [db.MaxDB]*db.DB, path string) (*notificationOpts, *notificationInfo, error) { var err error @@ -189,6 +194,14 @@ func (app *PlatformApp) processGet(dbs [db.MaxDB]*db.DB) (GetResponse, error) { return GetResponse{Payload: payload}, err } +func (app *PlatformApp) processAction(dbs [db.MaxDB]*db.DB) (ActionResponse, error) { + var resp ActionResponse + err := errors.New("Not implemented") + + return resp, err +} + + /////////////////////////// diff --git a/translib/sys_app.go b/translib/sys_app.go index 25970d4a239a..950cb2b822ba 100644 --- a/translib/sys_app.go +++ b/translib/sys_app.go @@ -76,6 +76,11 @@ func (app *SysApp) getAppRootObject() *ocbinds.OpenconfigSystem_System { return deviceObj.System } +func (app *SysApp) translateAction(dbs [db.MaxDB]*db.DB) error { + err := errors.New("Not supported") + return err +} + func (app *SysApp) translateSubscribe(dbs [db.MaxDB]*db.DB, path string) (*notificationOpts, *notificationInfo, error) { var err error @@ -339,3 +344,11 @@ func (app *SysApp) processGet(dbs [db.MaxDB]*db.DB) (GetResponse, error) { } return GetResponse{Payload: payload}, err } + +func (app *SysApp) processAction(dbs [db.MaxDB]*db.DB) (ActionResponse, error) { + var resp ActionResponse + err := errors.New("Not implemented") + + return resp, err +} + diff --git a/translib/translib.go b/translib/translib.go index 5cf4719eeb7e..c222503ceb77 100644 --- a/translib/translib.go +++ b/translib/translib.go @@ -104,6 +104,23 @@ type ActionResponse struct { ErrSrc ErrSource } +type BulkRequest struct { + DeleteRequest []SetRequest + ReplaceRequest []SetRequest + UpdateRequest []SetRequest + CreateRequest []SetRequest + User UserRoles + AuthEnabled bool + ClientVersion Version +} + +type BulkResponse struct { + DeleteResponse []SetResponse + ReplaceResponse []SetResponse + UpdateResponse []SetResponse + CreateResponse []SetResponse +} + type SubscribeRequest struct { Paths []string Q *queue.PriorityQueue @@ -478,8 +495,293 @@ func Get(req GetRequest) (GetResponse, error) { } func Action(req ActionRequest) (ActionResponse, error) { + var payload []byte var resp ActionResponse + path := req.Path + + + log.Info("Received Action request for path = ", path) + + app, appInfo, err := getAppModule(path, req.ClientVersion) + + if err != nil { + resp = ActionResponse{Payload: payload, ErrSrc: ProtoErr} + return resp, err + } + + aInfo := *appInfo + + aInfo.isNative = true + + err = appInitialize(app, &aInfo, path, &req.Payload, nil, GET) + + if err != nil { + resp = ActionResponse{Payload: payload, ErrSrc: AppErr} + return resp, err + } + + writeMutex.Lock() + defer writeMutex.Unlock() + + isGetCase := false + dbs, err := getAllDbs(isGetCase) + + if err != nil { + resp = ActionResponse{Payload: payload, ErrSrc: ProtoErr} + return resp, err + } + + defer closeAllDbs(dbs[:]) + + err = (*app).translateAction(dbs) + + if err != nil { + resp = ActionResponse{Payload: payload, ErrSrc: AppErr} + return resp, err + } + + resp, err = (*app).processAction(dbs) + + return resp, err +} + +func Bulk(req BulkRequest) (BulkResponse, error) { var err error + var keys []db.WatchKeys + var errSrc ErrSource + + delResp := make([]SetResponse, len(req.DeleteRequest)) + replaceResp := make([]SetResponse, len(req.ReplaceRequest)) + updateResp := make([]SetResponse, len(req.UpdateRequest)) + createResp := make([]SetResponse, len(req.CreateRequest)) + + resp := BulkResponse{DeleteResponse: delResp, + ReplaceResponse: replaceResp, + UpdateResponse: updateResp, + CreateResponse: createResp} + + writeMutex.Lock() + defer writeMutex.Unlock() + + isWriteDisabled := false + d, err := db.NewDB(getDBOptions(db.ConfigDB, isWriteDisabled)) + + if err != nil { + return resp, err + } + + defer d.DeleteDB() + + //Start the transaction without any keys or tables to watch will be added later using AppendWatchTx + err = d.StartTx(nil, nil) + + if err != nil { + return resp, err + } + + for i := range req.DeleteRequest { + path := req.DeleteRequest[i].Path + + log.Info("Delete request received with path =", path) + + app, appInfo, err := getAppModule(path, req.DeleteRequest[i].ClientVersion) + + if err != nil { + errSrc = ProtoErr + goto BulkDeleteError + } + + err = appInitialize(app, appInfo, path, nil, nil, DELETE) + + if err != nil { + errSrc = AppErr + goto BulkDeleteError + } + + keys, err = (*app).translateDelete(d) + + if err != nil { + errSrc = AppErr + goto BulkDeleteError + } + + err = d.AppendWatchTx(keys, appInfo.tablesToWatch) + + if err != nil { + errSrc = AppErr + goto BulkDeleteError + } + + resp.DeleteResponse[i], err = (*app).processDelete(d) + + if err != nil { + errSrc = AppErr + } + + BulkDeleteError: + + if err != nil { + d.AbortTx() + resp.DeleteResponse[i].ErrSrc = errSrc + resp.DeleteResponse[i].Err = err + return resp, err + } + } + + for i := range req.ReplaceRequest { + path := req.ReplaceRequest[i].Path + payload := req.ReplaceRequest[i].Payload + + log.Info("Replace request received with path =", path) + + app, appInfo, err := getAppModule(path, req.ReplaceRequest[i].ClientVersion) + + if err != nil { + errSrc = ProtoErr + goto BulkReplaceError + } + + log.Info("Bulk replace request received with path =", path) + log.Info("Bulk replace request received with payload =", string(payload)) + + err = appInitialize(app, appInfo, path, &payload, nil, REPLACE) + + if err != nil { + errSrc = AppErr + goto BulkReplaceError + } + + keys, err = (*app).translateReplace(d) + + if err != nil { + errSrc = AppErr + goto BulkReplaceError + } + + err = d.AppendWatchTx(keys, appInfo.tablesToWatch) + + if err != nil { + errSrc = AppErr + goto BulkReplaceError + } + + resp.ReplaceResponse[i], err = (*app).processReplace(d) + + if err != nil { + errSrc = AppErr + } + + BulkReplaceError: + + if err != nil { + d.AbortTx() + resp.ReplaceResponse[i].ErrSrc = errSrc + resp.ReplaceResponse[i].Err = err + return resp, err + } + } + + for i := range req.UpdateRequest { + path := req.UpdateRequest[i].Path + payload := req.UpdateRequest[i].Payload + + log.Info("Update request received with path =", path) + + app, appInfo, err := getAppModule(path, req.UpdateRequest[i].ClientVersion) + + if err != nil { + errSrc = ProtoErr + goto BulkUpdateError + } + + err = appInitialize(app, appInfo, path, &payload, nil, UPDATE) + + if err != nil { + errSrc = AppErr + goto BulkUpdateError + } + + keys, err = (*app).translateUpdate(d) + + if err != nil { + errSrc = AppErr + goto BulkUpdateError + } + + err = d.AppendWatchTx(keys, appInfo.tablesToWatch) + + if err != nil { + errSrc = AppErr + goto BulkUpdateError + } + + resp.UpdateResponse[i], err = (*app).processUpdate(d) + + if err != nil { + errSrc = AppErr + } + + BulkUpdateError: + + if err != nil { + d.AbortTx() + resp.UpdateResponse[i].ErrSrc = errSrc + resp.UpdateResponse[i].Err = err + return resp, err + } + } + + for i := range req.CreateRequest { + path := req.CreateRequest[i].Path + payload := req.CreateRequest[i].Payload + + log.Info("Create request received with path =", path) + + app, appInfo, err := getAppModule(path, req.CreateRequest[i].ClientVersion) + + if err != nil { + errSrc = ProtoErr + goto BulkCreateError + } + + err = appInitialize(app, appInfo, path, &payload, nil, CREATE) + + if err != nil { + errSrc = AppErr + goto BulkCreateError + } + + keys, err = (*app).translateCreate(d) + + if err != nil { + errSrc = AppErr + goto BulkCreateError + } + + err = d.AppendWatchTx(keys, appInfo.tablesToWatch) + + if err != nil { + errSrc = AppErr + goto BulkCreateError + } + + resp.CreateResponse[i], err = (*app).processCreate(d) + + if err != nil { + errSrc = AppErr + } + + BulkCreateError: + + if err != nil { + d.AbortTx() + resp.CreateResponse[i].ErrSrc = errSrc + resp.CreateResponse[i].Err = err + return resp, err + } + } + + err = d.CommitTx() return resp, err }