diff --git a/glusterd2/commands/volumes/volume-create.go b/glusterd2/commands/volumes/volume-create.go index 8ddd4db53..9a87f498c 100644 --- a/glusterd2/commands/volumes/volume-create.go +++ b/glusterd2/commands/volumes/volume-create.go @@ -11,6 +11,7 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" + transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/api" gderrors "github.com/gluster/glusterd2/pkg/errors" @@ -154,7 +155,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { return } - txn, err := transaction.NewTxnWithLocks(ctx, req.Name) + txn, err := transactionv2.NewTxnWithLocks(ctx, req.Name) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err) @@ -175,6 +176,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { }, { DoFunc: "vol-create.ValidateBricks", + Sync: true, Nodes: nodes, }, { diff --git a/glusterd2/main.go b/glusterd2/main.go index b8cf3351f..5a8eac8a3 100644 --- a/glusterd2/main.go +++ b/glusterd2/main.go @@ -14,6 +14,7 @@ import ( "github.com/gluster/glusterd2/glusterd2/peer" "github.com/gluster/glusterd2/glusterd2/servers" "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transactionv2" gdutils "github.com/gluster/glusterd2/glusterd2/utils" "github.com/gluster/glusterd2/glusterd2/xlator" "github.com/gluster/glusterd2/pkg/errors" @@ -101,6 +102,7 @@ func main() { log.WithError(err).Fatal("Failed to initialize store (etcd client)") } + transaction.StartTxnEngine() // Start the events framework after store is up if err := events.Start(); err != nil { log.WithError(err).Fatal("Failed to start internal events framework") @@ -146,6 +148,7 @@ func main() { fallthrough case unix.SIGINT: log.Info("Received SIGTERM. Stopping GlusterD") + transaction.StopTxnEngine() super.Stop() events.Stop() store.Close() diff --git a/glusterd2/transaction/context.go b/glusterd2/transaction/context.go index 5e58b0894..40d4c1077 100644 --- a/glusterd2/transaction/context.go +++ b/glusterd2/transaction/context.go @@ -33,29 +33,32 @@ type TxnCtx interface { // Logger returns the Logrus logger associated with the context Logger() log.FieldLogger - // commit writes all locally cached keys and values into the store using + // Commit writes all locally cached keys and values into the store using // a single etcd transaction. This is for internal use by the txn framework // and hence isn't exported. - commit() error + Commit() error + + // SyncCache synchronizes the locally cached keys and values from the store + SyncCache() error } // Tctx represents structure for transaction context type Tctx struct { - config *txnCtxConfig // this will be marshalled and sent on wire + config *TxnCtxConfig // this will be marshalled and sent on wire logger log.FieldLogger readSet map[string][]byte // cached responses from store readCacheDirty bool writeSet map[string]string // to be written to store } -// txnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx +// TxnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx // on receiver's end. -type txnCtxConfig struct { +type TxnCtxConfig struct { LogFields log.Fields StorePrefix string } -func newCtx(config *txnCtxConfig) *Tctx { +func newCtx(config *TxnCtxConfig) *Tctx { return &Tctx{ config: config, logger: log.StandardLogger().WithFields(config.LogFields), @@ -65,6 +68,11 @@ func newCtx(config *txnCtxConfig) *Tctx { } } +// NewCtx returns a transaction context from given config +func NewCtx(config *TxnCtxConfig) *Tctx { + return newCtx(config) +} + // Set attaches the given key-value pair to the context. // If the key exists, the value will be updated. func (c *Tctx) Set(key string, value interface{}) error { @@ -86,9 +94,22 @@ func (c *Tctx) Set(key string, value interface{}) error { return nil } -// commit writes all locally cached keys and values into the store using +// SyncCache synchronizes the locally cached keys and values from the store +func (c *Tctx) SyncCache() error { + resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + c.readSet[string(kv.Key)] = kv.Value + } + return nil +} + +// Commit writes all locally cached keys and values into the store using // a single etcd transaction. -func (c *Tctx) commit() error { +func (c *Tctx) Commit() error { if len(c.writeSet) == 0 { return nil @@ -120,6 +141,7 @@ func (c *Tctx) commit() error { expTxn.Add("txn_ctx_store_commit", 1) + c.writeSet = make(map[string]string) c.readCacheDirty = true return nil @@ -139,15 +161,10 @@ func (c *Tctx) Get(key string, value interface{}) error { // cache all keys and values from the store on the first call to Get if c.readCacheDirty { - resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix()) - if err != nil { + if err := c.SyncCache(); err != nil { c.logger.WithError(err).WithField("key", key).Error("failed to get key from transaction context") return err } - expTxn.Add("txn_ctx_store_get", 1) - for _, kv := range resp.Kvs { - c.readSet[string(kv.Key)] = kv.Value - } c.readCacheDirty = false } diff --git a/glusterd2/transaction/lock.go b/glusterd2/transaction/lock.go index 4fe6b0c65..3c65224ff 100644 --- a/glusterd2/transaction/lock.go +++ b/glusterd2/transaction/lock.go @@ -85,8 +85,8 @@ func CreateLockSteps(key string) (*Step, *Step, error) { return nil, nil, err } - lockStep := &Step{lockFunc, unlockFunc, []uuid.UUID{gdctx.MyUUID}, false} - unlockStep := &Step{unlockFunc, "", []uuid.UUID{gdctx.MyUUID}, false} + lockStep := &Step{DoFunc: lockFunc, UndoFunc: unlockFunc, Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} + unlockStep := &Step{DoFunc: unlockFunc, UndoFunc: "", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} return lockStep, unlockStep, nil } @@ -149,13 +149,16 @@ func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) { return lockFunc, unlockFunc } -func (t *Txn) lock(lockID string) error { +// Locks are the collection of cluster wide transaction lock +type Locks map[string]*concurrency.Mutex + +func (l Locks) lock(lockID string) error { // Ensure that no prior lock exists for the given lockID in this transaction - if _, ok := t.locks[lockID]; ok { + if _, ok := l[lockID]; ok { return ErrLockExists } - logger := t.Ctx.Logger().WithField("lockID", lockID) + logger := log.WithField("lockID", lockID) logger.Debug("attempting to obtain lock") key := lockPrefix + lockID @@ -169,7 +172,7 @@ func (t *Txn) lock(lockID string) error { case nil: logger.Debug("lock obtained") // Attach lock to the transaction - t.locks[lockID] = locker + l[lockID] = locker case context.DeadlineExceeded: // Propagate this all the way back to the client as a HTTP 409 response @@ -185,12 +188,12 @@ func (t *Txn) lock(lockID string) error { // Lock obtains a cluster wide transaction lock on the given lockID/lockIDs, // and attaches the obtained locks to the transaction -func (t *Txn) Lock(lockID string, lockIDs ...string) error { - if err := t.lock(lockID); err != nil { +func (l Locks) Lock(lockID string, lockIDs ...string) error { + if err := l.lock(lockID); err != nil { return err } for _, id := range lockIDs { - if err := t.lock(id); err != nil { + if err := l.lock(id); err != nil { return err } } diff --git a/glusterd2/transaction/rpc-service.go b/glusterd2/transaction/rpc-service.go index 0c49d0414..08716adef 100644 --- a/glusterd2/transaction/rpc-service.go +++ b/glusterd2/transaction/rpc-service.go @@ -59,7 +59,7 @@ func (p *txnSvc) RunStep(rpcCtx context.Context, req *TxnStepReq) (*TxnStepResp, goto End } - if err = ctx.commit(); err != nil { + if err = ctx.Commit(); err != nil { logger.WithError(err).Error("failed to commit txn context to store") } diff --git a/glusterd2/transaction/step.go b/glusterd2/transaction/step.go index 784d7c02e..69a89b61c 100644 --- a/glusterd2/transaction/step.go +++ b/glusterd2/transaction/step.go @@ -27,6 +27,7 @@ type Step struct { UndoFunc string Nodes []uuid.UUID Skip bool + Sync bool } var ( @@ -136,7 +137,7 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod var err error if uuid.Equal(node, gdctx.MyUUID) { - err = runStepFuncLocally(origCtx, stepName, ctx) + err = RunStepFuncLocally(origCtx, stepName, ctx) } else { // remote node err = runStepOn(origCtx, stepName, node, ctx) @@ -145,7 +146,8 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod respCh <- stepPeerResp{node, err} } -func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { +// RunStepFuncLocally runs a step func on local node +func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { var err error @@ -163,7 +165,7 @@ func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) er if err = stepFunc(ctx); err == nil { // if step function executes successfully, commit the // results to the store - err = ctx.commit() + err = ctx.Commit() } } else { err = ErrStepFuncNotFound diff --git a/glusterd2/transaction/transaction.go b/glusterd2/transaction/transaction.go index 3cb5c3bad..8a8698ae2 100644 --- a/glusterd2/transaction/transaction.go +++ b/glusterd2/transaction/transaction.go @@ -26,7 +26,7 @@ var expTxn = expvar.NewMap("txn") // Txn is a set of steps type Txn struct { id uuid.UUID - locks map[string]*concurrency.Mutex + locks Locks reqID uuid.UUID storePrefix string @@ -49,7 +49,7 @@ func NewTxn(ctx context.Context) *Txn { t.reqID = gdctx.GetReqID(ctx) t.locks = make(map[string]*concurrency.Mutex) t.storePrefix = txnPrefix + t.id.String() + "/" - config := &txnCtxConfig{ + config := &TxnCtxConfig{ LogFields: log.Fields{ "txnid": t.id.String(), "reqid": t.reqID.String(), @@ -68,7 +68,7 @@ func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { t := NewTxn(ctx) for _, id := range lockIDs { - if err := t.Lock(id); err != nil { + if err := t.locks.Lock(id); err != nil { t.Done() return nil, err } @@ -125,7 +125,7 @@ func (t *Txn) Do() error { expTxn.Add("initiated_txn_in_progress", 1) // commit txn.Ctx.Set()s done in REST handlers to the store - if err := t.Ctx.commit(); err != nil { + if err := t.Ctx.Commit(); err != nil { return err } diff --git a/glusterd2/transactionv2/engine.go b/glusterd2/transactionv2/engine.go new file mode 100644 index 000000000..cb9031f57 --- /dev/null +++ b/glusterd2/transactionv2/engine.go @@ -0,0 +1,261 @@ +package transaction + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" +) + +const ( + // PendingTxnPrefix is the etcd namespace into which all pending txn will be stored + PendingTxnPrefix = "pending-transaction/" + txnSyncTimeout = time.Second * 10 +) + +// TransactionEngine is responsible for executing newly added txn +var TransactionEngine *TxnEngine + +// TxnEngine engine executes the given transaction across the cluster. +// It makes use of etcd as the means of communication between nodes. +type TxnEngine struct { + stop chan struct{} + stopOnce sync.Once + selfNodeID uuid.UUID + stepRunner StepRunner +} + +// NewTxnEngine creates a TxnEngine +func NewTxnEngine() *TxnEngine { + return &TxnEngine{ + stop: make(chan struct{}), + selfNodeID: gdctx.MyUUID, + stepRunner: newStepRunner(), + } +} + +// Run will start running the TxnEngine and wait for txn Engine to be stopped. +func (txnEng *TxnEngine) Run() { + log.Info("running txn engine") + + go UntilStop(txnEng.HandleTransaction, 0, txnEng.stop) + go UntilStop(txnEng.HandleFailedTxn, 0, txnEng.stop) + + <-txnEng.stop + log.Info("txn engine stopped") +} + +// HandleTransaction executes newly added txn to the store. It will keep watching on +// `pending-transaction` namespace, if a new txn is added to the namespace then it will +// execute that txn. +func (txnEng *TxnEngine) HandleTransaction() { + txnChan := GlobalTxnManager.WatchTxn(txnEng.stop) + + for { + select { + case <-txnEng.stop: + return + case txn, ok := <-txnChan: + if !ok { + return + } + log.Debugf("received a txn %+v\n", txn) + go txnEng.Execute(context.Background(), txn) + } + } +} + +func (txnEng *TxnEngine) isInitiator(txn *Txn) bool { + return uuid.Equal(txn.Initiator, txnEng.selfNodeID) +} + +// Execute will run a given txn +func (txnEng *TxnEngine) Execute(ctx context.Context, txn *Txn) { + var shouldExecute bool + for _, node := range txn.Nodes { + if uuid.Equal(node, txnEng.selfNodeID) { + txn.Ctx.Logger().WithField("peerID", txnEng.selfNodeID.String()).Info("executing txn on peer") + shouldExecute = true + break + } + } + + if !shouldExecute { + txn.Ctx.Logger().WithField("peerID", txnEng.selfNodeID.String()).Info("peer is not involved in this txn") + return + } + + if txnEng.isInitiator(txn) { + if err := WithClusterLocks(txn.Locks...)(txn); err != nil { + txn.Ctx.Logger().WithError(err).Error("error in acquiring cluster lock") + return + } + defer txn.releaseLocks() + } + + txnStatus, err := GlobalTxnManager.GetTxnStatus(txn.ID, txnEng.selfNodeID) + if err != nil { + txn.Ctx.Logger().WithError(err).Error("error in getting txn status") + return + } + + switch txnStatus.State { + case txnPending: + if err := txnEng.executePendingTxn(ctx, txn); err != nil { + status := TxnStatus{State: txnFailed, TxnErr: NewTransactionError(err, errTxnFailed, txnEng.selfNodeID)} + if txnErr, ok := err.(*TransactionError); ok { + status.TxnErr = txnErr + } + GlobalTxnManager.UpDateTxnStatus(status, txn.ID, txnEng.selfNodeID) + return + } + txn.Ctx.Logger().Info("txn succeeded") + GlobalTxnManager.UpDateTxnStatus(TxnStatus{State: txnSucceeded}, txn.ID, txnEng.selfNodeID) + } + return +} + +// HandleFailedTxn keep on watching store for failed txn. If it receives any failed +// txn then it will rollback all executed steps of that txn. +func (txnEng *TxnEngine) HandleFailedTxn() { + failedTxnChan := GlobalTxnManager.WatchFailedTxn(txnEng.stop, txnEng.selfNodeID) + + for { + select { + case <-txnEng.stop: + return + case failedTxn, ok := <-failedTxnChan: + if !ok { + return + } + + lastStepIndex, err := GlobalTxnManager.GetLastExecutedStep(failedTxn.ID, txnEng.selfNodeID) + if err != nil || lastStepIndex == -1 { + continue + } + failedTxn.Ctx.Logger().Debugf("received a failed txn, rolling back changes") + + for i := lastStepIndex; i >= 0; i-- { + err := txnEng.stepRunner.RollBackStep(context.Background(), failedTxn.Steps[i], failedTxn.Ctx) + if err != nil { + failedTxn.Ctx.Logger().WithError(err).WithField("step", failedTxn.Steps[i]).Error("failed in rolling back step") + } + } + GlobalTxnManager.UpdateLastExecutedStep(-1, failedTxn.ID, txnEng.selfNodeID) + } + } +} + +func (txnEng *TxnEngine) executePendingTxn(ctx context.Context, txn *Txn) error { + var ( + stopch = make(chan struct{}) + txnStatusChan = GlobalTxnManager.WatchTxnStatus(stopch, txn.ID, txnEng.selfNodeID) + ) + defer close(stopch) + + for i, step := range txn.Steps { + if step.Sync { + txn.Ctx.Logger().WithField("stepname", step.DoFunc).Info("synchronizing txn step") + stopSync := make(chan struct{}) + if err := txnEng.syncTxn(txn, i, stopSync, txnStatusChan); err != nil { + close(stopSync) + txn.Ctx.Logger().WithError(err).Errorf("encounter an error in synchronizing txn step %+v", step) + return err + } + close(stopSync) + txn.Ctx.Logger().Info("transaction got synchronized") + } + + if err := txnEng.stepRunner.RunStep(ctx, step, txn.Ctx); err != nil { + txn.Ctx.Logger().WithError(err).Errorf("failed in executing step %+v", step) + GlobalTxnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return err + } + + select { + case <-ctx.Done(): + GlobalTxnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return ctx.Err() + case status := <-txnStatusChan: + if status.State == txnFailed { + GlobalTxnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return status.TxnErr + } + default: + } + + GlobalTxnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + GlobalTxnManager.UpDateTxnStatus(TxnStatus{State: txnRunning}, txn.ID, txnEng.selfNodeID) + } + return nil +} + +func (txnEng *TxnEngine) syncTxn(txn *Txn, syncStep int, stopCh <-chan struct{}, txnStatusChan <-chan TxnStatus) error { + var ( + nodesSynced int64 + syncTimer = time.NewTimer(txnSyncTimeout) + ) + + for _, nodeID := range txn.Nodes { + go func(syncStep int, txnID uuid.UUID, nodeID uuid.UUID) { + lastStepWatchChan := GlobalTxnManager.WatchLastExecutedStep(stopCh, txnID, nodeID) + for { + select { + case <-stopCh: + return + case lastStep := <-lastStepWatchChan: + if lastStep == syncStep-1 { + atomic.AddInt64(&nodesSynced, 1) + return + } + } + } + }(syncStep, txn.ID, nodeID) + } + + for { + select { + case <-syncTimer.C: + return NewTransactionSyncError(txnEng.selfNodeID) + case <-stopCh: + return nil + case status := <-txnStatusChan: + if status.State == txnFailed { + return status.TxnErr + } + default: + } + + if count := atomic.LoadInt64(&nodesSynced); count == int64(len(txn.Nodes)) { + return nil + } + } +} + +// Stop will stop a running Txn Engine +func (txnEng *TxnEngine) Stop() { + log.Info("stopping txn engine") + txnEng.stopOnce.Do(func() { + close(txnEng.stop) + }) +} + +// StartTxnEngine creates a new Txn Engine and starts running it +func StartTxnEngine() { + TransactionEngine = NewTxnEngine() + GlobalTxnManager = NewTxnManager(store.Store.Watcher) + go TransactionEngine.Run() +} + +// StopTxnEngine stops the Txn Engine +func StopTxnEngine() { + if TransactionEngine != nil { + TransactionEngine.Stop() + } +} diff --git a/glusterd2/transactionv2/errors.go b/glusterd2/transactionv2/errors.go new file mode 100644 index 000000000..8ea605119 --- /dev/null +++ b/glusterd2/transactionv2/errors.go @@ -0,0 +1,62 @@ +package transaction + +import "github.com/pborman/uuid" + +const ( + errTxnTimeout int = iota + 1 + errTxnFailed + errTxnSync + errTxnSyncTimeout +) + +// TransactionError represent error in executing a txn +type TransactionError struct { + Code int + Reason string + NodeId uuid.UUID +} + +var errCodeToMessage = map[int]string{ + errTxnTimeout: "transaction timed out", + errTxnFailed: "transaction failed", + errTxnSync: "transaction sync error", + errTxnSyncTimeout: "timeout in synchronizing txn", +} + +// Error returns the reason of the txn failure +func (t *TransactionError) Error() string { + return t.Reason +} + +// NewTransactionError returns a new TransactionError +func NewTransactionError(err error, code int, nodeID uuid.UUID) *TransactionError { + return &TransactionError{ + Code: code, + Reason: err.Error(), + NodeId: nodeID, + } +} + +// NewTransactionTimeoutError returns a TransactionError with error code as time out +func NewTransactionTimeoutError(nodeID uuid.UUID) *TransactionError { + return &TransactionError{ + Code: errTxnTimeout, + NodeId: nodeID, + Reason: errCodeToMessage[errTxnTimeout], + } +} + +// NewTransactionSyncError returns a TransactionError with error code as txn sync timeout +func NewTransactionSyncError(nodeID uuid.UUID) *TransactionError { + return &TransactionError{ + Code: errTxnSyncTimeout, + NodeId: nodeID, + Reason: errCodeToMessage[errTxnSyncTimeout], + } +} + +// IsTransactionError reports whether given error is of type TransactionError +func IsTransactionError(err error) (*TransactionError, bool) { + txnErr, ok := err.(*TransactionError) + return txnErr, ok +} diff --git a/glusterd2/transactionv2/steprunner.go b/glusterd2/transactionv2/steprunner.go new file mode 100644 index 000000000..15bf54be8 --- /dev/null +++ b/glusterd2/transactionv2/steprunner.go @@ -0,0 +1,64 @@ +package transaction + +import ( + "context" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/pborman/uuid" +) + +// StepRunner is an interface for running a step and also rollback step on local node +type StepRunner interface { + RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error + RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error +} + +type stepManager struct { + selfNodeID uuid.UUID +} + +func newStepRunner() StepRunner { + return &stepManager{ + selfNodeID: gdctx.MyUUID, + } +} + +func (sm *stepManager) shouldRunStep(step *transaction.Step) bool { + if step.Skip { + return false + } + + for _, id := range step.Nodes { + if uuid.Equal(sm.selfNodeID, id) { + return true + } + } + return false +} + +func (sm *stepManager) runStep(ctx context.Context, stepName string, txnCtx transaction.TxnCtx) error { + txnCtx.SyncCache() + return transaction.RunStepFuncLocally(ctx, stepName, txnCtx) +} + +// RollBackStep will rollback a given step on local node +func (sm *stepManager) RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { + if !sm.shouldRunStep(step) { + return nil + } + + if step.UndoFunc != "" { + return sm.runStep(ctx, step.UndoFunc, txnCtx) + } + return nil +} + +// RunStepRunStep will execute the step on local node +func (sm *stepManager) RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { + if !sm.shouldRunStep(step) { + return nil + } + return sm.runStep(ctx, step.DoFunc, txnCtx) +} diff --git a/glusterd2/transactionv2/transaction.go b/glusterd2/transactionv2/transaction.go new file mode 100644 index 000000000..af192325c --- /dev/null +++ b/glusterd2/transactionv2/transaction.go @@ -0,0 +1,255 @@ +// Package transaction implements a distributed transaction handling framework +package transaction + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" +) + +const ( + txnPrefix = "transaction/" + txnTimeOut = time.Second * 30 +) + +// TxnOptFunc receives a Txn and overrides its members +type TxnOptFunc func(*Txn) error + +// Txn is a set of steps +type Txn struct { + locks transaction.Locks + + // Nodes is the union of the all the TxnStep.Nodes and is implicitly + // set in Txn.Do(). This list is used to determine liveness of the + // nodes before running the transaction steps. + Nodes []uuid.UUID `json:"nodes"` + StorePrefix string `json:"store_prefix"` + ID uuid.UUID `json:"id"` + Locks []string `json:"locks"` + ReqID uuid.UUID `json:"req_id"` + Ctx transaction.TxnCtx `json:"ctx"` + Steps []*transaction.Step `json:"steps"` + DontCheckAlive bool `json:"dont_check_alive"` + DisableRollback bool `json:"disable_rollback"` + Initiator uuid.UUID `json:"initiator"` + + success chan struct{} + error chan error +} + +// NewTxn returns an initialized Txn without any steps +func NewTxn(ctx context.Context) *Txn { + t := new(Txn) + + t.ID = uuid.NewRandom() + t.ReqID = gdctx.GetReqID(ctx) + t.locks = make(map[string]*concurrency.Mutex) + t.StorePrefix = txnPrefix + t.ID.String() + "/" + config := &transaction.TxnCtxConfig{ + LogFields: log.Fields{ + "txnid": t.ID.String(), + "reqid": t.ReqID.String(), + }, + StorePrefix: t.StorePrefix, + } + t.Ctx = transaction.NewCtx(config) + t.Initiator = gdctx.MyUUID + t.Ctx.Logger().Debug("new transaction created") + return t +} + +// NewTxnWithLocks returns an empty Txn with locks obtained on given lockIDs +func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { + t := NewTxn(ctx) + t.Locks = lockIDs + return t, nil +} + +// WithClusterLocks obtains a cluster wide locks on given IDs for a txn +func WithClusterLocks(lockIDs ...string) TxnOptFunc { + return func(t *Txn) error { + for _, id := range lockIDs { + if err := t.locks.Lock(id); err != nil { + t.releaseLocks() + return err + } + } + return nil + } +} + +func (t *Txn) releaseLocks() { + for _, locker := range t.locks { + locker.Unlock(context.Background()) + } +} + +// Done releases any obtained locks and cleans up the transaction namespace +// Done must be called after a transaction ends +func (t *Txn) Done() { + +} + +func (t *Txn) done() { + if _, err := store.Delete(context.TODO(), t.StorePrefix, clientv3.WithPrefix()); err != nil { + t.Ctx.Logger().WithError(err).WithField("key", + t.StorePrefix).Error("Failed to remove transaction namespace from store") + } + +} + +func (t *Txn) checkAlive() error { + + if len(t.Nodes) == 0 { + for _, s := range t.Steps { + t.Nodes = append(t.Nodes, s.Nodes...) + } + } + t.Nodes = nodesUnion(t.Nodes) + + for _, node := range t.Nodes { + // TODO: Using prefixed query, get all alive nodes in a single etcd query + if _, online := store.Store.IsNodeAlive(node); !online { + return fmt.Errorf("node %s is probably down", node.String()) + } + } + + return nil +} + +// Do runs the transaction on the cluster +func (t *Txn) Do() error { + var ( + stop = make(chan struct{}) + timer = time.NewTimer(txnTimeOut) + ) + + t.success = make(chan struct{}) + t.error = make(chan error) + + defer timer.Stop() + + if !t.DontCheckAlive { + if err := t.checkAlive(); err != nil { + return err + } + } + + t.Ctx.Logger().Debug("Starting transaction") + + go t.waitForCompletion(stop) + + GlobalTxnManager.UpDateTxnStatus(TxnStatus{State: txnPending}, t.ID, t.Nodes...) + + // commit txn.Ctx.Set()s done in REST handlers to the store + if err := t.Ctx.Commit(); err != nil { + return err + } + + t.Ctx.Logger().Debug("adding txn to store") + if err := GlobalTxnManager.AddTxn(t); err != nil { + return err + } + t.Ctx.Logger().Debug("waiting for txn to be cleaned up") + + select { + case <-t.success: + close(stop) + t.done() + GlobalTxnManager.RemoveTransaction(t.ID) + t.Ctx.Logger().Info("txn succeeded on all nodes, cleaned up from store") + case err := <-t.error: + t.Ctx.Logger().WithError(err).Error("error in executing txn, marking as failure") + close(stop) + txnStatus := TxnStatus{txnFailed, &TransactionError{Code: errTxnFailed, Reason: err.Error()}} + if txnErr, ok := err.(*TransactionError); ok { + txnStatus.TxnErr = txnErr + } + GlobalTxnManager.UpDateTxnStatus(txnStatus, t.ID, t.Nodes...) + return err + case <-timer.C: + t.Ctx.Logger().Error("time out in cleaning txn, marking as failure") + close(stop) + for _, nodeID := range t.Nodes { + txnStatus := TxnStatus{State: txnFailed, TxnErr: NewTransactionTimeoutError(nodeID)} + GlobalTxnManager.UpDateTxnStatus(txnStatus, t.ID, nodeID) + } + return NewTransactionTimeoutError(t.Initiator) + } + + return nil +} + +func (t *Txn) waitForCompletion(stopCh <-chan struct{}) { + var nodesSucceded uint64 + + for _, nodeID := range t.Nodes { + go func(nodeID uuid.UUID, txnID uuid.UUID) { + txnStatusChan := GlobalTxnManager.WatchTxnStatus(stopCh, txnID, nodeID) + for { + select { + case <-stopCh: + return + case status, ok := <-txnStatusChan: + if !ok { + return + } + + log.WithFields(log.Fields{ + "nodeId": nodeID.String(), + "status": fmt.Sprintf("%+v", status), + }).Debug("state received") + + if status.State == txnSucceeded { + atomic.AddUint64(&nodesSucceded, 1) + return + } else if status.State == txnFailed { + t.error <- status.TxnErr + return + } + } + } + }(nodeID, t.ID) + } + + for { + select { + case <-stopCh: + t.Ctx.Logger().WithFields(log.Fields{ + "total_nodes": len(t.Nodes), + "nodes_succeeded": atomic.LoadUint64(&nodesSucceded), + }).Debug("wait is over for txn") + return + default: + + } + + if count := atomic.LoadUint64(&nodesSucceded); count == uint64(len(t.Nodes)) { + t.success <- struct{}{} + return + } + } +} + +// nodesUnion removes duplicate nodes +func nodesUnion(nodes []uuid.UUID) []uuid.UUID { + for i := 0; i < len(nodes); i++ { + for j := i + 1; j < len(nodes); j++ { + if uuid.Equal(nodes[i], nodes[j]) { + nodes = append(nodes[:j], nodes[j+1:]...) + j-- + } + } + } + return nodes +} diff --git a/glusterd2/transactionv2/txnmanager.go b/glusterd2/transactionv2/txnmanager.go new file mode 100644 index 000000000..dc1a2788d --- /dev/null +++ b/glusterd2/transactionv2/txnmanager.go @@ -0,0 +1,397 @@ +package transaction + +import ( + "context" + "encoding/json" + "errors" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/pborman/uuid" +) + +// GlobalTxnManager stores and manages access to transaction related data +var GlobalTxnManager TxnManager + +const ( + // TxnStatusPrefix is etcd key prefix under which status of a txn is stored for a particular node + // eg.. key for storing status:- pending-transaction//status/ + TxnStatusPrefix = "status" + // LastExecutedStepPrefix is etcd key prefix under which last step executed on a particular node for a txn is stored + // eg.. key for storing last executed step on a node:- pending-transaction//laststep/ + LastExecutedStepPrefix = "laststep" + // etcd txn timeout in seconds + etcdTxnTimeout = time.Second * 5 +) + +// TxnManager stores and manages access to transaction related data in +// `pending-transaction` namespace. +type TxnManager interface { + WatchTxn(stopCh <-chan struct{}) <-chan *Txn + GetTxns() []*Txn + AddTxn(txn *Txn) error + GetTxnByUUID(id uuid.UUID) (*Txn, error) + RemoveTransaction(txnID uuid.UUID) error + UpdateLastExecutedStep(index int, txnID uuid.UUID, nodeID uuid.UUID) error + GetLastExecutedStep(txnID uuid.UUID, nodeID uuid.UUID) (int, error) + WatchLastExecutedStep(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan int + WatchFailedTxn(stopCh <-chan struct{}, nodeID uuid.UUID) <-chan *Txn + WatchTxnStatus(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan TxnStatus + GetTxnStatus(txnID uuid.UUID, nodeID uuid.UUID) (TxnStatus, error) + UpDateTxnStatus(state TxnStatus, txnID uuid.UUID, nodeIDs ...uuid.UUID) error +} + +type txnManager struct { + getStoreKey func(...string) string + storeWatcher clientv3.Watcher + mutex sync.Mutex +} + +// NewTxnManager returns a TxnManager +func NewTxnManager(storeWatcher clientv3.Watcher) TxnManager { + tm := &txnManager{ + storeWatcher: storeWatcher, + } + tm.getStoreKey = func(s ...string) string { + key := path.Join(PendingTxnPrefix, path.Join(s...)) + return key + } + return tm +} + +// RemoveTransaction removes a transaction from `pending-transaction namespace` +func (tm *txnManager) RemoveTransaction(txnID uuid.UUID) error { + _, err := store.Delete(context.TODO(), tm.getStoreKey(txnID.String()), clientv3.WithPrefix()) + return err +} + +// WatchTxnStatus watches status of txn on a particular node +func (tm *txnManager) WatchTxnStatus(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan TxnStatus { + var ( + txnStatusChan = make(chan TxnStatus, 10) + key = tm.getStoreKey(txnID.String(), TxnStatusPrefix, nodeID.String()) + ) + + respHandler := func(response clientv3.WatchResponse) { + for _, event := range response.Events { + txnStatus := TxnStatus{} + if err := json.Unmarshal(event.Kv.Value, &txnStatus); err != nil { + continue + } + if !txnStatus.State.Valid() { + continue + } + txnStatusChan <- txnStatus + } + } + + tm.watch(stopCh, key, respHandler, clientv3.WithFilterDelete()) + return txnStatusChan +} + +// WatchTxn watches for newly added txn to store +func (tm *txnManager) WatchTxn(stopCh <-chan struct{}) <-chan *Txn { + var ( + txnChan = make(chan *Txn, 10) + key = tm.getStoreKey() + opts = []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithFilterDelete()} + ) + + respHandler := func(response clientv3.WatchResponse) { + for _, txn := range tm.watchRespToTxns(response) { + txnChan <- txn + } + } + + tm.watch(stopCh, key, respHandler, opts...) + + return txnChan +} + +// WatchFailedTxn watches for a failed txn on a particular node +func (tm *txnManager) WatchFailedTxn(stopCh <-chan struct{}, nodeID uuid.UUID) <-chan *Txn { + txnChan := make(chan *Txn, 10) + + go func(key string) { + resp, err := store.Get(context.TODO(), key, clientv3.WithPrefix()) + if err != nil { + return + } + for _, txn := range tm.getRespToFailedTxns(resp, nodeID) { + txnChan <- txn + } + }(tm.getStoreKey()) + + respHandler := func(response clientv3.WatchResponse) { + for _, txn := range tm.watchRespToFailedTxns(response, nodeID) { + txnChan <- txn + } + } + + tm.watch(stopCh, tm.getStoreKey(), respHandler, clientv3.WithPrefix(), clientv3.WithFilterDelete()) + return txnChan +} + +func (tm *txnManager) getRespToFailedTxns(resp *clientv3.GetResponse, nodeID uuid.UUID) (txns []*Txn) { + for _, kv := range resp.Kvs { + if txn := tm.kvToFailedTxn(kv, nodeID); txn != nil { + txns = append(txns, txn) + } + } + return +} + +func (tm *txnManager) watchRespToFailedTxns(resp clientv3.WatchResponse, nodeID uuid.UUID) (txns []*Txn) { + for _, event := range resp.Events { + if txn := tm.kvToFailedTxn(event.Kv, nodeID); txn != nil { + txns = append(txns, txn) + } + } + return +} + +func (tm *txnManager) kvToFailedTxn(kv *mvccpb.KeyValue, nodeID uuid.UUID) *Txn { + prefix, nodeid := path.Split(string(kv.Key)) + prefix = path.Clean(prefix) + if uuid.Parse(nodeid) == nil || !strings.HasSuffix(prefix, TxnStatusPrefix) || nodeid != nodeID.String() { + return nil + } + + txnStatus := TxnStatus{} + if err := json.Unmarshal(kv.Value, &txnStatus); err != nil { + return nil + } + + if txnStatus.State != txnFailed || !txnStatus.State.Valid() { + return nil + } + + tokens := strings.Split(prefix, "/") + if len(tokens) != 3 { + return nil + } + + tid := uuid.Parse(tokens[1]) + if tid == nil { + return nil + } + + if txn, err := tm.GetTxnByUUID(tid); err == nil { + return txn + } + + return nil +} + +func (tm *txnManager) watchRespToTxns(resp clientv3.WatchResponse) (txns []*Txn) { + for _, event := range resp.Events { + prefix, id := path.Split(string(event.Kv.Key)) + if uuid.Parse(id) == nil || !strings.HasSuffix(prefix, PendingTxnPrefix) { + continue + } + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(event.Kv.Value, txn); err != nil { + continue + } + + txn.locks = make(map[string]*concurrency.Mutex) + txns = append(txns, txn) + } + return +} + +// AddTxn adds a txn to the store +func (tm *txnManager) AddTxn(txn *Txn) error { + data, err := json.Marshal(txn) + if err != nil { + return err + } + _, err = store.Put(context.TODO(), tm.getStoreKey(txn.ID.String()), string(data)) + return err +} + +// GetTxnByUUID returns the txn from given ID +func (tm *txnManager) GetTxnByUUID(id uuid.UUID) (*Txn, error) { + key := tm.getStoreKey(id.String()) + resp, err := store.Get(context.TODO(), key) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, errors.New(key + " key not found") + } + + kv := resp.Kvs[0] + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(kv.Value, txn); err != nil { + return nil, err + } + txn.locks = make(map[string]*concurrency.Mutex) + return txn, nil +} + +// GetTxns returns all txns added to the store +func (tm *txnManager) GetTxns() (txns []*Txn) { + resp, err := store.Get(context.TODO(), tm.getStoreKey(), clientv3.WithPrefix()) + if err != nil { + return + } + for _, kv := range resp.Kvs { + _, id := path.Split(string(kv.Key)) + if uuid.Parse(id) == nil { + continue + } + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(kv.Value, txn); err != nil { + continue + } + txn.locks = make(map[string]*concurrency.Mutex) + txns = append(txns, txn) + } + return +} + +// UpDateTxnStatus updates txn status for given nodes +func (tm *txnManager) UpDateTxnStatus(status TxnStatus, txnID uuid.UUID, nodeIDs ...uuid.UUID) error { + var ( + ctx, cancel = context.WithTimeout(context.Background(), etcdTxnTimeout) + storeMutex = concurrency.NewMutex(store.Store.Session, txnID.String()) + putOps []clientv3.Op + ) + + storeMutex.Lock(ctx) + defer cancel() + defer storeMutex.Unlock(ctx) + + data, err := json.Marshal(status) + if err != nil { + return err + } + + for _, nodeID := range nodeIDs { + key := tm.getStoreKey(txnID.String(), TxnStatusPrefix, nodeID.String()) + putOps = append(putOps, clientv3.OpPut(key, string(data))) + } + + txn, err := store.Txn(ctx).Then(putOps...).Commit() + if err != nil || !txn.Succeeded { + return errors.New("etcd txn to update txn status failed") + } + return nil +} + +// GetTxnStatus returns status of given txn on a particular node +func (tm *txnManager) GetTxnStatus(txnID uuid.UUID, nodeID uuid.UUID) (TxnStatus, error) { + var ( + ctx, cancel = context.WithCancel(context.Background()) + key = tm.getStoreKey(txnID.String(), TxnStatusPrefix, nodeID.String()) + storeMutex = concurrency.NewMutex(store.Store.Session, txnID.String()) + ) + + storeMutex.Lock(ctx) + defer cancel() + defer storeMutex.Unlock(ctx) + + resp, err := store.Get(context.TODO(), key) + if err != nil { + return TxnStatus{State: txnUnknown}, err + } + + if len(resp.Kvs) == 0 { + return TxnStatus{State: txnUnknown}, errors.New(key + " key not found") + } + + txnStatus := TxnStatus{} + kv := resp.Kvs[0] + + if err := json.Unmarshal(kv.Value, &txnStatus); err != nil { + return TxnStatus{State: txnUnknown}, err + } + + if !txnStatus.State.Valid() { + return TxnStatus{State: txnUnknown}, errors.New("invalid txn state") + } + + return txnStatus, nil +} + +// UpdateLastExecutedStep updates the last executed step on a node of a given txn ID +func (tm *txnManager) UpdateLastExecutedStep(index int, txnID uuid.UUID, nodeID uuid.UUID) error { + key := tm.getStoreKey(txnID.String(), LastExecutedStepPrefix, nodeID.String()) + _, err := store.Put(context.TODO(), key, strconv.Itoa(index)) + return err +} + +// GetLastExecutedStep fetches the last executed step on a node for a given txn ID +func (tm *txnManager) GetLastExecutedStep(txnID uuid.UUID, nodeID uuid.UUID) (int, error) { + key := tm.getStoreKey(txnID.String(), LastExecutedStepPrefix, nodeID.String()) + resp, err := store.Get(context.TODO(), key) + if err != nil { + return 0, err + } + + if len(resp.Kvs) == 0 { + return 0, errors.New(key + " key not found") + } + + kv := resp.Kvs[0] + return strconv.Atoi(string(kv.Value)) +} + +// WatchLastExecutedStep watches for last executed step on a node for a given txn ID +func (tm *txnManager) WatchLastExecutedStep(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan int { + var ( + lastExecutedStepChan = make(chan int) + key = tm.getStoreKey(txnID.String(), LastExecutedStepPrefix, nodeID.String()) + opts = []clientv3.OpOption{} + ) + + resp, err := store.Get(context.TODO(), key) + if err == nil && resp.Count == 1 { + opts = append(opts, clientv3.WithRev(resp.Kvs[0].CreateRevision)) + } + + respHandler := func(response clientv3.WatchResponse) { + for _, event := range response.Events { + lastStep := string(event.Kv.Value) + if i, err := strconv.Atoi(lastStep); err == nil { + lastExecutedStepChan <- i + } + } + } + tm.watch(stopCh, key, respHandler, opts...) + return lastExecutedStepChan +} + +func (tm *txnManager) watch(stopCh <-chan struct{}, key string, respHandler func(clientv3.WatchResponse), opts ...clientv3.OpOption) { + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + watchRespChan := tm.storeWatcher.Watch(ctx, key, opts...) + for { + select { + case <-stopCh: + return + case watchResp := <-watchRespChan: + if watchResp.Err() != nil || watchResp.Canceled { + return + } + respHandler(watchResp) + + } + } + }() +} diff --git a/glusterd2/transactionv2/types.go b/glusterd2/transactionv2/types.go new file mode 100644 index 000000000..425ccb43b --- /dev/null +++ b/glusterd2/transactionv2/types.go @@ -0,0 +1,34 @@ +package transaction + +// TxnState represents state of a txn +type TxnState string + +const ( + txnPending TxnState = "Pending" + txnRunning TxnState = "Running" + txnSucceeded TxnState = "Succeeded" + txnFailed TxnState = "Failed" + txnUnknown TxnState = "Unknown" +) + +// Valid returns whether a TxnState is valid or not +func (ts TxnState) Valid() bool { + switch ts { + case txnPending: + fallthrough + case txnRunning: + fallthrough + case txnSucceeded: + fallthrough + case txnFailed: + return true + default: + return false + } +} + +// TxnStatus represents status of a txn +type TxnStatus struct { + State TxnState `json:"txn_state"` + TxnErr *TransactionError `json:"txn_error,omitempty"` +} diff --git a/glusterd2/transactionv2/utils.go b/glusterd2/transactionv2/utils.go new file mode 100644 index 000000000..b7b03c6fe --- /dev/null +++ b/glusterd2/transactionv2/utils.go @@ -0,0 +1,73 @@ +package transaction + +import ( + "fmt" + "runtime" + "time" + + log "github.com/sirupsen/logrus" +) + +// NeverStop can be used in UntilStop to make it never stop +var NeverStop <-chan struct{} = make(chan struct{}) + +// UntilStop loops until stop channel is closed, running f every d duration +func UntilStop(f func(), d time.Duration, stop <-chan struct{}) { + var ( + t *time.Timer + timeout bool + ) + + for { + select { + case <-stop: + return + default: + } + func() { + defer HandlePanic() + f() + }() + t = ResetTimer(t, d, timeout) + select { + case <-stop: + return + case <-t.C: + timeout = true + } + } +} + +// ResetTimer avoids allocating a new timer if one is already in use +func ResetTimer(t *time.Timer, dur time.Duration, timeout bool) *time.Timer { + if t == nil { + return time.NewTimer(dur) + } + if !t.Stop() && !timeout { + <-t.C + } + t.Reset(dur) + return t +} + +// HandlePanic simply recovers from a panic and logs an error. +func HandlePanic() { + if r := recover(); r != nil { + callers := getCallers() + log.WithFields(log.Fields{ + "panic": r, + "callers": callers, + }).Error("recovered from panic") + } +} + +func getCallers() (callers string) { + for i := 0; true; i++ { + _, file, line, ok := runtime.Caller(i) + if !ok { + return + } + callers += fmt.Sprintf("%v:%v\n", file, line) + } + return +}