diff --git a/glusterd2/commands/peers/peer-rpc-svc.go b/glusterd2/commands/peers/peer-rpc-svc.go index b7211c1b3..d37f3b734 100644 --- a/glusterd2/commands/peers/peer-rpc-svc.go +++ b/glusterd2/commands/peers/peer-rpc-svc.go @@ -8,10 +8,12 @@ import ( "github.com/gluster/glusterd2/glusterd2/peer" "github.com/gluster/glusterd2/glusterd2/servers/peerrpc" "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transactionv2" + "github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/utils" - "github.com/pborman/uuid" + "github.com/pborman/uuid" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -169,6 +171,8 @@ func ReconfigureStore(c *StoreConfig) error { // Stop events framework events.Stop() + transaction.StopTxnEngine() + cleanuphandler.StopCleanupLeader() // do not delete cluster namespace if this is not a loner node var deleteNamespace bool @@ -217,7 +221,8 @@ func ReconfigureStore(c *StoreConfig) error { // Now that new store is up, start events framework events.Start() - + transaction.StartTxnEngine() + cleanuphandler.StartCleanupLeader() return nil } diff --git a/glusterd2/commands/volumes/volume-create.go b/glusterd2/commands/volumes/volume-create.go index 8ddd4db53..9a87f498c 100644 --- a/glusterd2/commands/volumes/volume-create.go +++ b/glusterd2/commands/volumes/volume-create.go @@ -11,6 +11,7 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" + transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/api" gderrors "github.com/gluster/glusterd2/pkg/errors" @@ -154,7 +155,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { return } - txn, err := transaction.NewTxnWithLocks(ctx, req.Name) + txn, err := transactionv2.NewTxnWithLocks(ctx, req.Name) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err) @@ -175,6 +176,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) { }, { DoFunc: "vol-create.ValidateBricks", + Sync: true, Nodes: nodes, }, { diff --git a/glusterd2/main.go b/glusterd2/main.go index cc7f4efd2..4c5e5deb1 100644 --- a/glusterd2/main.go +++ b/glusterd2/main.go @@ -15,6 +15,8 @@ import ( "github.com/gluster/glusterd2/glusterd2/pmap" "github.com/gluster/glusterd2/glusterd2/servers" "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transactionv2" + "github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler" gdutils "github.com/gluster/glusterd2/glusterd2/utils" "github.com/gluster/glusterd2/glusterd2/xlator" "github.com/gluster/glusterd2/pkg/errors" @@ -103,6 +105,8 @@ func main() { log.WithError(err).Fatal("Failed to initialize store (etcd client)") } + transaction.StartTxnEngine() + cleanuphandler.StartCleanupLeader() // Start the events framework after store is up if err := events.Start(); err != nil { log.WithError(err).Fatal("Failed to start internal events framework") @@ -156,6 +160,8 @@ func main() { case unix.SIGINT: log.Info("Received SIGTERM. Stopping GlusterD") gdctx.IsTerminating = true + transaction.StopTxnEngine() + cleanuphandler.StopCleanupLeader() super.Stop() events.Stop() store.Close() diff --git a/glusterd2/transaction/context.go b/glusterd2/transaction/context.go index 5e58b0894..40d4c1077 100644 --- a/glusterd2/transaction/context.go +++ b/glusterd2/transaction/context.go @@ -33,29 +33,32 @@ type TxnCtx interface { // Logger returns the Logrus logger associated with the context Logger() log.FieldLogger - // commit writes all locally cached keys and values into the store using + // Commit writes all locally cached keys and values into the store using // a single etcd transaction. This is for internal use by the txn framework // and hence isn't exported. - commit() error + Commit() error + + // SyncCache synchronizes the locally cached keys and values from the store + SyncCache() error } // Tctx represents structure for transaction context type Tctx struct { - config *txnCtxConfig // this will be marshalled and sent on wire + config *TxnCtxConfig // this will be marshalled and sent on wire logger log.FieldLogger readSet map[string][]byte // cached responses from store readCacheDirty bool writeSet map[string]string // to be written to store } -// txnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx +// TxnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx // on receiver's end. -type txnCtxConfig struct { +type TxnCtxConfig struct { LogFields log.Fields StorePrefix string } -func newCtx(config *txnCtxConfig) *Tctx { +func newCtx(config *TxnCtxConfig) *Tctx { return &Tctx{ config: config, logger: log.StandardLogger().WithFields(config.LogFields), @@ -65,6 +68,11 @@ func newCtx(config *txnCtxConfig) *Tctx { } } +// NewCtx returns a transaction context from given config +func NewCtx(config *TxnCtxConfig) *Tctx { + return newCtx(config) +} + // Set attaches the given key-value pair to the context. // If the key exists, the value will be updated. func (c *Tctx) Set(key string, value interface{}) error { @@ -86,9 +94,22 @@ func (c *Tctx) Set(key string, value interface{}) error { return nil } -// commit writes all locally cached keys and values into the store using +// SyncCache synchronizes the locally cached keys and values from the store +func (c *Tctx) SyncCache() error { + resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + c.readSet[string(kv.Key)] = kv.Value + } + return nil +} + +// Commit writes all locally cached keys and values into the store using // a single etcd transaction. -func (c *Tctx) commit() error { +func (c *Tctx) Commit() error { if len(c.writeSet) == 0 { return nil @@ -120,6 +141,7 @@ func (c *Tctx) commit() error { expTxn.Add("txn_ctx_store_commit", 1) + c.writeSet = make(map[string]string) c.readCacheDirty = true return nil @@ -139,15 +161,10 @@ func (c *Tctx) Get(key string, value interface{}) error { // cache all keys and values from the store on the first call to Get if c.readCacheDirty { - resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix()) - if err != nil { + if err := c.SyncCache(); err != nil { c.logger.WithError(err).WithField("key", key).Error("failed to get key from transaction context") return err } - expTxn.Add("txn_ctx_store_get", 1) - for _, kv := range resp.Kvs { - c.readSet[string(kv.Key)] = kv.Value - } c.readCacheDirty = false } diff --git a/glusterd2/transaction/lock.go b/glusterd2/transaction/lock.go index 4fe6b0c65..772c51209 100644 --- a/glusterd2/transaction/lock.go +++ b/glusterd2/transaction/lock.go @@ -85,8 +85,8 @@ func CreateLockSteps(key string) (*Step, *Step, error) { return nil, nil, err } - lockStep := &Step{lockFunc, unlockFunc, []uuid.UUID{gdctx.MyUUID}, false} - unlockStep := &Step{unlockFunc, "", []uuid.UUID{gdctx.MyUUID}, false} + lockStep := &Step{DoFunc: lockFunc, UndoFunc: unlockFunc, Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} + unlockStep := &Step{DoFunc: unlockFunc, UndoFunc: "", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} return lockStep, unlockStep, nil } @@ -149,15 +149,15 @@ func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) { return lockFunc, unlockFunc } -func (t *Txn) lock(lockID string) error { +// Locks are the collection of cluster wide transaction lock +type Locks map[string]*concurrency.Mutex + +func (l Locks) lock(lockID string) error { // Ensure that no prior lock exists for the given lockID in this transaction - if _, ok := t.locks[lockID]; ok { + if _, ok := l[lockID]; ok { return ErrLockExists } - logger := t.Ctx.Logger().WithField("lockID", lockID) - logger.Debug("attempting to obtain lock") - key := lockPrefix + lockID locker := concurrency.NewMutex(store.Store.Session, key) @@ -167,17 +167,14 @@ func (t *Txn) lock(lockID string) error { err := locker.Lock(ctx) switch err { case nil: - logger.Debug("lock obtained") // Attach lock to the transaction - t.locks[lockID] = locker + l[lockID] = locker case context.DeadlineExceeded: // Propagate this all the way back to the client as a HTTP 409 response - logger.Debug("timeout: failed to obtain lock") err = ErrLockTimeout default: - logger.WithError(err).Error("failed to obtain lock") } return err @@ -185,14 +182,23 @@ func (t *Txn) lock(lockID string) error { // Lock obtains a cluster wide transaction lock on the given lockID/lockIDs, // and attaches the obtained locks to the transaction -func (t *Txn) Lock(lockID string, lockIDs ...string) error { - if err := t.lock(lockID); err != nil { +func (l Locks) Lock(lockID string, lockIDs ...string) error { + if err := l.lock(lockID); err != nil { return err } for _, id := range lockIDs { - if err := t.lock(id); err != nil { + if err := l.lock(id); err != nil { return err } } return nil } + +// UnLock releases all cluster wide obtained locks +func (l Locks) UnLock(ctx context.Context) { + for lockID, locker := range l { + if err := locker.Unlock(ctx); err == nil { + delete(l, lockID) + } + } +} diff --git a/glusterd2/transaction/rpc-service.go b/glusterd2/transaction/rpc-service.go index ecc00b78c..43ca372aa 100644 --- a/glusterd2/transaction/rpc-service.go +++ b/glusterd2/transaction/rpc-service.go @@ -59,7 +59,7 @@ func (p *txnSvc) RunStep(rpcCtx context.Context, req *TxnStepReq) (*TxnStepResp, goto End } - if err = ctx.commit(); err != nil { + if err = ctx.Commit(); err != nil { logger.WithError(err).Error("failed to commit txn context to store") } diff --git a/glusterd2/transaction/step.go b/glusterd2/transaction/step.go index 784d7c02e..69a89b61c 100644 --- a/glusterd2/transaction/step.go +++ b/glusterd2/transaction/step.go @@ -27,6 +27,7 @@ type Step struct { UndoFunc string Nodes []uuid.UUID Skip bool + Sync bool } var ( @@ -136,7 +137,7 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod var err error if uuid.Equal(node, gdctx.MyUUID) { - err = runStepFuncLocally(origCtx, stepName, ctx) + err = RunStepFuncLocally(origCtx, stepName, ctx) } else { // remote node err = runStepOn(origCtx, stepName, node, ctx) @@ -145,7 +146,8 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod respCh <- stepPeerResp{node, err} } -func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { +// RunStepFuncLocally runs a step func on local node +func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { var err error @@ -163,7 +165,7 @@ func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) er if err = stepFunc(ctx); err == nil { // if step function executes successfully, commit the // results to the store - err = ctx.commit() + err = ctx.Commit() } } else { err = ErrStepFuncNotFound diff --git a/glusterd2/transaction/transaction.go b/glusterd2/transaction/transaction.go index 3cb5c3bad..deca9501d 100644 --- a/glusterd2/transaction/transaction.go +++ b/glusterd2/transaction/transaction.go @@ -26,7 +26,7 @@ var expTxn = expvar.NewMap("txn") // Txn is a set of steps type Txn struct { id uuid.UUID - locks map[string]*concurrency.Mutex + locks Locks reqID uuid.UUID storePrefix string @@ -49,7 +49,7 @@ func NewTxn(ctx context.Context) *Txn { t.reqID = gdctx.GetReqID(ctx) t.locks = make(map[string]*concurrency.Mutex) t.storePrefix = txnPrefix + t.id.String() + "/" - config := &txnCtxConfig{ + config := &TxnCtxConfig{ LogFields: log.Fields{ "txnid": t.id.String(), "reqid": t.reqID.String(), @@ -68,10 +68,16 @@ func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { t := NewTxn(ctx) for _, id := range lockIDs { - if err := t.Lock(id); err != nil { + logger := t.Ctx.Logger().WithField("lockID", id) + logger.Debug("attempting to obtain lock") + + if err := t.locks.Lock(id); err != nil { + logger.WithError(err).Error("failed to obtain lock") t.Done() return nil, err } + + logger.Debug("lock obtained") } return t, nil @@ -125,7 +131,7 @@ func (t *Txn) Do() error { expTxn.Add("initiated_txn_in_progress", 1) // commit txn.Ctx.Set()s done in REST handlers to the store - if err := t.Ctx.commit(); err != nil { + if err := t.Ctx.Commit(); err != nil { return err } diff --git a/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go b/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go new file mode 100644 index 000000000..641c2e8d6 --- /dev/null +++ b/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go @@ -0,0 +1,172 @@ +package cleanuphandler + +import ( + "context" + "sync" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + txn "github.com/gluster/glusterd2/glusterd2/transaction" + "github.com/gluster/glusterd2/glusterd2/transactionv2" + + "github.com/coreos/etcd/clientv3" + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" +) + +const ( + leaderKey = "leader" + cleanupTimerDur = time.Second * 5 + txnMaxAge = time.Second * 20 + electionTimerDur = time.Second * 10 +) + +// CleanupLeader is responsible for performing all cleaning operation +var CleanupLeader *CleanupHandler + +// CleaupHandlerOptFunc accepts a CleanupHandler and overrides its members +type CleaupHandlerOptFunc func(handler *CleanupHandler) error + +// CleanupHandler performs all cleaning operation. +// It will remove all expired txn related data from store. +// A leader is elected among the peers in the cluster to +// cleanup stale transactions. The leader periodically scans +// the pending transaction namespace for failed and stale +// transactions, and cleans them up if rollback is completed +// by all peers involved in the transaction. +type CleanupHandler struct { + sync.Mutex + isLeader bool + locks txn.Locks + selfNodeID uuid.UUID + storeClient *clientv3.Client + stopChan chan struct{} + stopOnce sync.Once + txnManager transaction.TxnManager +} + +// NewCleanupHandler returns a new CleanupHandler +func NewCleanupHandler(optFuncs ...CleaupHandlerOptFunc) (*CleanupHandler, error) { + cl := &CleanupHandler{ + storeClient: store.Store.Client, + stopChan: make(chan struct{}), + txnManager: transaction.NewTxnManager(store.Store.Watcher), + selfNodeID: gdctx.MyUUID, + locks: make(txn.Locks), + } + + for _, optFunc := range optFuncs { + if err := optFunc(cl); err != nil { + return nil, err + } + } + + return cl, nil +} + +// Run starts running CleanupHandler +func (c *CleanupHandler) Run() { + log.Info("cleanup handler started") + + go transaction.UntilStop(c.HandleStaleTxn, cleanupTimerDur, c.stopChan) + go transaction.UntilStop(c.CleanFailedTxn, cleanupTimerDur, c.stopChan) + + <-c.stopChan + log.Info("cleanup handler stopped") +} + +// HandleStaleTxn will mark all the expired txn as failed based maxAge of a txn +func (c *CleanupHandler) HandleStaleTxn() { + c.Lock() + isLeader := c.isLeader + c.Unlock() + + if isLeader { + c.txnManager.TxnGC(txnMaxAge) + } + +} + +// CleanFailedTxn removes all failed txn if rollback is +// completed by all peers involved in the transaction +func (c *CleanupHandler) CleanFailedTxn() { + c.Lock() + isLeader := c.isLeader + c.Unlock() + + if isLeader { + c.txnManager.RemoveFailedTxns() + } +} + +// Stop will stop running the CleanupHandler +func (c *CleanupHandler) Stop() { + log.Info("attempting to stop cleanup handler") + c.stopOnce.Do(func() { + close(c.stopChan) + }) + + store.Delete(context.TODO(), leaderKey) + c.locks.UnLock(context.Background()) +} + +// StartElecting triggers a new election after every `electionTimerDur`. +// If it succeeded then it assumes the leader role and returns +func (c *CleanupHandler) StartElecting() { + log.Info("node started to contest for leader election") + + transaction.UntilSuccess(c.IsNodeElected, electionTimerDur, c.stopChan) + + log.Info("node got elected as cleanup leader") + c.Lock() + defer c.Unlock() + c.isLeader = true +} + +// IsNodeElected returns whether a node is elected as a leader or not. +// Leader attempts to set a common key using a transaction that checks +// if the key already exists. If not, the candidate leader sets the +// key with a lease and assumes the leader role. +func (c *CleanupHandler) IsNodeElected() bool { + var ( + leaseID = store.Store.Session.Lease() + lockID = gdctx.MyClusterID.String() + logger = log.WithField("lockID", lockID) + ) + + if err := c.locks.Lock(lockID); err != nil { + logger.WithError(err).Error("error in acquiring lock") + return false + } + defer c.locks.UnLock(context.Background()) + + resp, err := store.Txn(context.Background()). + If(clientv3.Compare(clientv3.Version(leaderKey), "=", 0)). + Then(clientv3.OpPut(leaderKey, c.selfNodeID.String(), clientv3.WithLease(leaseID))). + Commit() + + return (err == nil) && resp.Succeeded +} + +// StartCleanupLeader starts cleanup leader +func StartCleanupLeader() { + var err error + + CleanupLeader, err = NewCleanupHandler() + + if err != nil { + log.WithError(err).Errorf("failed in starting cleanup handler") + return + } + + go CleanupLeader.StartElecting() + go CleanupLeader.Run() +} + +// StopCleanupLeader stops the cleanup leader +func StopCleanupLeader() { + if CleanupLeader != nil { + CleanupLeader.Stop() + } +} diff --git a/glusterd2/transactionv2/engine.go b/glusterd2/transactionv2/engine.go new file mode 100644 index 000000000..b2ece7f5e --- /dev/null +++ b/glusterd2/transactionv2/engine.go @@ -0,0 +1,260 @@ +package transaction + +import ( + "context" + "errors" + "github.com/gluster/glusterd2/glusterd2/store" + "sync" + "sync/atomic" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" +) + +const ( + // PendingTxnPrefix is the etcd namespace into which all pending txn will be stored + PendingTxnPrefix = "pending-transaction/" + txnSyncTimeout = time.Second * 10 +) + +// TransactionEngine is responsible for executing newly added txn +var TransactionEngine *TxnEngine + +// TxnEngine executes the given transaction across the cluster. +// It makes use of etcd as the means of communication between nodes. +type TxnEngine struct { + stop chan struct{} + stopOnce sync.Once + selfNodeID uuid.UUID + stepManager StepManager + txnManager TxnManager +} + +// NewTxnEngine creates a TxnEngine +func NewTxnEngine() *TxnEngine { + return &TxnEngine{ + stop: make(chan struct{}), + selfNodeID: gdctx.MyUUID, + stepManager: newStepManager(), + txnManager: NewTxnManager(store.Store.Watcher), + } +} + +// Run will start running the TxnEngine and wait for txn Engine to be stopped. +func (txnEng *TxnEngine) Run() { + log.Info("running txn engine") + + go UntilStop(txnEng.HandleTransaction, 0, txnEng.stop) + go UntilStop(txnEng.HandleFailedTxn, 0, txnEng.stop) + + <-txnEng.stop + log.Info("txn engine stopped") +} + +// HandleTransaction executes newly added txn to the store. It will keep watching on +// `pending-transaction` namespace, if a new txn is added to the namespace then it will +// execute that txn. +func (txnEng *TxnEngine) HandleTransaction() { + txnChan := txnEng.txnManager.WatchTxn(txnEng.stop) + + for { + select { + case <-txnEng.stop: + return + case txn, ok := <-txnChan: + if !ok { + return + } + log.Debugf("received a txn %+v\n", txn) + go txnEng.Execute(context.Background(), txn) + } + } +} + +func (txnEng *TxnEngine) isInitiator(txn *Txn) bool { + return uuid.Equal(txn.Initiator, txnEng.selfNodeID) +} + +// Execute will run a given txn +func (txnEng *TxnEngine) Execute(ctx context.Context, txn *Txn) { + var shouldExecute bool + for _, node := range txn.Nodes { + if uuid.Equal(node, txnEng.selfNodeID) { + txn.Ctx.Logger().WithField("peerID", txnEng.selfNodeID.String()).Info("executing txn on peer") + shouldExecute = true + break + } + } + + if !shouldExecute { + txn.Ctx.Logger().WithField("peerID", txnEng.selfNodeID.String()).Info("peer is not involved in this txn") + return + } + + if txnEng.isInitiator(txn) { + if err := WithClusterLocks(txn.Locks...)(txn); err != nil { + txn.Ctx.Logger().WithError(err).Error("error in acquiring cluster lock") + return + } + defer txn.releaseLocks() + } + + txnStatus, err := txnEng.txnManager.GetTxnStatus(txn.ID, txnEng.selfNodeID) + if err != nil { + txn.Ctx.Logger().WithError(err).Error("error in getting txn status") + return + } + + switch txnStatus.State { + case txnPending: + if err := txnEng.executePendingTxn(ctx, txn); err != nil { + status := TxnStatus{State: txnFailed, Reason: err.Error(), TxnID: txn.ID} + txnEng.txnManager.UpDateTxnStatus(status, txn.ID, txnEng.selfNodeID) + return + } + txn.Ctx.Logger().Info("txn succeeded") + txnEng.txnManager.UpDateTxnStatus(TxnStatus{State: txnSucceeded, TxnID: txn.ID}, txn.ID, txnEng.selfNodeID) + } + return +} + +func (txnEng *TxnEngine) executePendingTxn(ctx context.Context, txn *Txn) error { + var ( + stopch = make(chan struct{}) + txnStatusChan = txnEng.txnManager.WatchTxnStatus(stopch, txn.ID, txnEng.selfNodeID) + updateOnce = &sync.Once{} + ) + defer close(stopch) + + for i, step := range txn.Steps { + if step.Sync { + txn.Ctx.Logger().WithField("stepname", step.DoFunc).Info("synchronizing txn step") + if err := txnEng.stepManager.SyncStep(ctx, i, txn); err != nil { + txn.Ctx.Logger().WithError(err).Errorf("encounter an error in synchronizing txn step %+v", step) + return err + } + txn.Ctx.Logger().Info("transaction got synchronized") + } + + if err := txnEng.stepManager.RunStep(ctx, step, txn.Ctx); err != nil { + txn.Ctx.Logger().WithError(err).Errorf("failed in executing step %+v", step) + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return err + } + + select { + case <-ctx.Done(): + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return ctx.Err() + case status := <-txnStatusChan: + if status.State == txnFailed { + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + return errors.New(status.Reason) + } + default: + } + + txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) + updateOnce.Do(func() { + txnEng.txnManager.UpDateTxnStatus(TxnStatus{State: txnRunning, TxnID: txn.ID}, txn.ID, txnEng.selfNodeID) + }) + } + return nil +} + +func (txnEng *TxnEngine) syncTxn(txn *Txn, syncStep int, stopCh <-chan struct{}, txnStatusChan <-chan TxnStatus) error { + var ( + nodesSynced int64 + syncTimer = time.NewTimer(txnSyncTimeout) + ) + + for _, nodeID := range txn.Nodes { + go func(syncStep int, txnID uuid.UUID, nodeID uuid.UUID) { + lastStepWatchChan := txnEng.txnManager.WatchLastExecutedStep(stopCh, txnID, nodeID) + for { + select { + case <-stopCh: + return + case lastStep := <-lastStepWatchChan: + if lastStep == syncStep-1 { + atomic.AddInt64(&nodesSynced, 1) + return + } + } + } + }(syncStep, txn.ID, nodeID) + } + + for { + select { + case <-syncTimer.C: + return errTxnSyncTimeout + case <-stopCh: + return nil + case status := <-txnStatusChan: + if status.State == txnFailed { + return errors.New(status.Reason) + } + default: + } + + if count := atomic.LoadInt64(&nodesSynced); count == int64(len(txn.Nodes)) { + return nil + } + } +} + +// HandleFailedTxn keep on watching store for failed txn. If it receives any failed +// txn then it will rollback all executed steps of that txn. +func (txnEng *TxnEngine) HandleFailedTxn() { + failedTxnChan := txnEng.txnManager.WatchFailedTxn(txnEng.stop, txnEng.selfNodeID) + + for { + select { + case <-txnEng.stop: + return + case failedTxn, ok := <-failedTxnChan: + if !ok { + return + } + + lastStepIndex, err := txnEng.txnManager.GetLastExecutedStep(failedTxn.ID, txnEng.selfNodeID) + if err != nil || lastStepIndex == -1 { + continue + } + failedTxn.Ctx.Logger().Debugf("received a failed txn, rolling back changes") + + for i := lastStepIndex; i >= 0; i-- { + err := txnEng.stepManager.RollBackStep(context.Background(), failedTxn.Steps[i], failedTxn.Ctx) + if err != nil { + failedTxn.Ctx.Logger().WithError(err).WithField("step", failedTxn.Steps[i]).Error("failed in rolling back step") + } + } + txnEng.txnManager.UpdateLastExecutedStep(-1, failedTxn.ID, txnEng.selfNodeID) + } + } +} + +// Stop will stop a running Txn Engine +func (txnEng *TxnEngine) Stop() { + log.Info("stopping txn engine") + txnEng.stopOnce.Do(func() { + close(txnEng.stop) + }) +} + +// StartTxnEngine creates a new Txn Engine and starts running it +func StartTxnEngine() { + TransactionEngine = NewTxnEngine() + GlobalTxnManager = NewTxnManager(store.Store.Watcher) + go TransactionEngine.Run() +} + +// StopTxnEngine stops the Txn Engine +func StopTxnEngine() { + if TransactionEngine != nil { + TransactionEngine.Stop() + } +} diff --git a/glusterd2/transactionv2/errors.go b/glusterd2/transactionv2/errors.go new file mode 100644 index 000000000..0b354cdfe --- /dev/null +++ b/glusterd2/transactionv2/errors.go @@ -0,0 +1,8 @@ +package transaction + +import "errors" + +var ( + errTxnTimeout = errors.New("txn timeout") + errTxnSyncTimeout = errors.New("timeout in synchronizing txn") +) diff --git a/glusterd2/transactionv2/steprunner.go b/glusterd2/transactionv2/steprunner.go new file mode 100644 index 000000000..46002c920 --- /dev/null +++ b/glusterd2/transactionv2/steprunner.go @@ -0,0 +1,104 @@ +package transaction + +import ( + "context" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/pborman/uuid" +) + +// StepManager is an interface for running a step and also rollback step on local node +type StepManager interface { + RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error + RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error + SyncStep(ctx context.Context, stepIndex int, txn *Txn) error +} + +type stepManager struct { + selfNodeID uuid.UUID +} + +func newStepManager() StepManager { + return &stepManager{ + selfNodeID: gdctx.MyUUID, + } +} + +func (sm *stepManager) shouldRunStep(step *transaction.Step) bool { + if step.Skip { + return false + } + + for _, id := range step.Nodes { + if uuid.Equal(sm.selfNodeID, id) { + return true + } + } + return false +} + +func (sm *stepManager) runStep(ctx context.Context, stepName string, txnCtx transaction.TxnCtx) error { + txnCtx.SyncCache() + return transaction.RunStepFuncLocally(ctx, stepName, txnCtx) +} + +func (sm *stepManager) isPrevStepsExecutedOnNode(ctx context.Context, syncStepIndex int, nodeID uuid.UUID, txnID uuid.UUID, success chan<- struct{}) { + txnManager := NewTxnManager(store.Store.Watcher) + lastStepWatchChan := txnManager.WatchLastExecutedStep(ctx.Done(), txnID, nodeID) + for { + select { + case <-ctx.Done(): + return + case lastStep := <-lastStepWatchChan: + if lastStep == syncStepIndex-1 { + success <- struct{}{} + return + } + } + } +} + +// SyncStep synchronises a step of given txn across all nodes of cluster +func (sm *stepManager) SyncStep(ctx context.Context, syncStepIndex int, txn *Txn) error { + var ( + success = make(chan struct{}) + syncCtx, cancel = context.WithTimeout(ctx, txnSyncTimeout) + ) + defer cancel() + + for _, nodeID := range txn.Nodes { + go sm.isPrevStepsExecutedOnNode(syncCtx, syncStepIndex, nodeID, txn.ID, success) + } + + for range txn.Nodes { + select { + case <-syncCtx.Done(): + return errTxnSyncTimeout + case <-success: + } + } + return nil +} + +// RollBackStep will rollback a given step on local node +func (sm *stepManager) RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { + if !sm.shouldRunStep(step) { + return nil + } + + if step.UndoFunc != "" { + return sm.runStep(ctx, step.UndoFunc, txnCtx) + } + return nil +} + +// RunStepRunStep will execute the step on local node +func (sm *stepManager) RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { + if !sm.shouldRunStep(step) { + return nil + } + return sm.runStep(ctx, step.DoFunc, txnCtx) +} diff --git a/glusterd2/transactionv2/transaction.go b/glusterd2/transactionv2/transaction.go new file mode 100644 index 000000000..7ed233c0d --- /dev/null +++ b/glusterd2/transactionv2/transaction.go @@ -0,0 +1,248 @@ +// Package transaction implements a distributed transaction handling framework +package transaction + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" +) + +const ( + txnPrefix = "transaction/" + txnTimeOut = time.Second * 15 +) + +// TxnOptFunc receives a Txn and overrides its members +type TxnOptFunc func(*Txn) error + +// Txn is a set of steps +type Txn struct { + locks transaction.Locks + + // Nodes is the union of the all the TxnStep.Nodes and is implicitly + // set in Txn.Do(). This list is used to determine liveness of the + // nodes before running the transaction steps. + Nodes []uuid.UUID `json:"nodes"` + StorePrefix string `json:"store_prefix"` + ID uuid.UUID `json:"id"` + Locks []string `json:"locks"` + ReqID uuid.UUID `json:"req_id"` + Ctx transaction.TxnCtx `json:"ctx"` + Steps []*transaction.Step `json:"steps"` + DontCheckAlive bool `json:"dont_check_alive"` + DisableRollback bool `json:"disable_rollback"` + Initiator uuid.UUID `json:"initiator"` + StartTime time.Time `json:"start_time"` + + success chan struct{} + error chan error +} + +// NewTxn returns an initialized Txn without any steps +func NewTxn(ctx context.Context) *Txn { + t := new(Txn) + + t.ID = uuid.NewRandom() + t.ReqID = gdctx.GetReqID(ctx) + t.locks = make(map[string]*concurrency.Mutex) + t.StorePrefix = txnPrefix + t.ID.String() + "/" + config := &transaction.TxnCtxConfig{ + LogFields: log.Fields{ + "txnid": t.ID.String(), + "reqid": t.ReqID.String(), + }, + StorePrefix: t.StorePrefix, + } + t.Ctx = transaction.NewCtx(config) + t.Initiator = gdctx.MyUUID + t.Ctx.Logger().Debug("new transaction created") + return t +} + +// NewTxnWithLocks returns an empty Txn with locks obtained on given lockIDs +func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { + t := NewTxn(ctx) + t.Locks = lockIDs + return t, nil +} + +// WithClusterLocks obtains a cluster wide locks on given IDs for a txn +func WithClusterLocks(lockIDs ...string) TxnOptFunc { + return func(t *Txn) error { + for _, id := range lockIDs { + logger := t.Ctx.Logger().WithField("lockID", id) + logger.Debug("attempting to obtain lock") + if err := t.locks.Lock(id); err != nil { + logger.WithError(err).Error("failed to obtain lock") + t.releaseLocks() + return err + } + logger.Debug("lock obtained") + } + return nil + } +} + +func (t *Txn) releaseLocks() { + t.locks.UnLock(context.Background()) +} + +// Done releases any obtained locks and cleans up the transaction namespace +// Done must be called after a transaction ends +func (t *Txn) Done() { + +} + +func (t *Txn) done() { + if _, err := store.Delete(context.TODO(), t.StorePrefix, clientv3.WithPrefix()); err != nil { + t.Ctx.Logger().WithError(err).WithField("key", + t.StorePrefix).Error("Failed to remove transaction namespace from store") + } + +} + +func (t *Txn) checkAlive() error { + + if len(t.Nodes) == 0 { + for _, s := range t.Steps { + t.Nodes = append(t.Nodes, s.Nodes...) + } + } + t.Nodes = nodesUnion(t.Nodes) + + for _, node := range t.Nodes { + // TODO: Using prefixed query, get all alive nodes in a single etcd query + if _, online := store.Store.IsNodeAlive(node); !online { + return fmt.Errorf("node %s is probably down", node.String()) + } + } + + return nil +} + +// Do runs the transaction on the cluster +func (t *Txn) Do() error { + var ( + stop = make(chan struct{}) + timer = time.NewTimer(txnTimeOut) + ) + + { + t.success = make(chan struct{}) + t.error = make(chan error) + t.StartTime = time.Now() + } + + defer timer.Stop() + + if !t.DontCheckAlive { + if err := t.checkAlive(); err != nil { + return err + } + } + + t.Ctx.Logger().Debug("Starting transaction") + + go t.waitForCompletion(stop) + + GlobalTxnManager.UpDateTxnStatus(TxnStatus{State: txnPending, TxnID: t.ID}, t.ID, t.Nodes...) + + // commit txn.Ctx.Set()s done in REST handlers to the store + if err := t.Ctx.Commit(); err != nil { + return err + } + + t.Ctx.Logger().Debug("adding txn to store") + if err := GlobalTxnManager.AddTxn(t); err != nil { + return err + } + t.Ctx.Logger().Debug("waiting for txn to be cleaned up") + + select { + case <-t.success: + close(stop) + t.done() + GlobalTxnManager.RemoveTransaction(t.ID) + t.Ctx.Logger().Info("txn succeeded on all nodes, cleaned up from store") + case err := <-t.error: + t.Ctx.Logger().WithError(err).Error("error in executing txn, marking as failure") + close(stop) + txnStatus := TxnStatus{State: txnFailed, TxnID: t.ID, Reason: err.Error()} + GlobalTxnManager.UpDateTxnStatus(txnStatus, t.ID, t.Nodes...) + return err + case <-timer.C: + t.Ctx.Logger().Error("time out in cleaning txn, marking as failure") + close(stop) + for _, nodeID := range t.Nodes { + txnStatus := TxnStatus{State: txnFailed, TxnID: t.ID, Reason: "txn timed out"} + GlobalTxnManager.UpDateTxnStatus(txnStatus, t.ID, nodeID) + } + return errTxnTimeout + } + + return nil +} + +func (t *Txn) isNodeSucceded(nodeID uuid.UUID, success chan<- struct{}, stopCh <-chan struct{}) { + txnStatusChan := GlobalTxnManager.WatchTxnStatus(stopCh, t.ID, nodeID) + + for { + select { + case <-stopCh: + return + case status := <-txnStatusChan: + log.WithFields(log.Fields{ + "nodeId": nodeID.String(), + "status": fmt.Sprintf("%+v", status), + }).Debug("state received") + + if status.State == txnSucceeded { + success <- struct{}{} + return + } else if status.State == txnFailed { + t.error <- errors.New(status.Reason) + return + } + } + } +} + +func (t *Txn) waitForCompletion(stopCh <-chan struct{}) { + var successChan = make(chan struct{}) + + for _, nodeID := range t.Nodes { + go t.isNodeSucceded(nodeID, successChan, stopCh) + } + + for range t.Nodes { + select { + case <-stopCh: + return + case <-successChan: + } + } + t.success <- struct{}{} +} + +// nodesUnion removes duplicate nodes +func nodesUnion(nodes []uuid.UUID) []uuid.UUID { + for i := 0; i < len(nodes); i++ { + for j := i + 1; j < len(nodes); j++ { + if uuid.Equal(nodes[i], nodes[j]) { + nodes = append(nodes[:j], nodes[j+1:]...) + j-- + } + } + } + return nodes +} diff --git a/glusterd2/transactionv2/txnmanager.go b/glusterd2/transactionv2/txnmanager.go new file mode 100644 index 000000000..3908afa89 --- /dev/null +++ b/glusterd2/transactionv2/txnmanager.go @@ -0,0 +1,436 @@ +package transaction + +import ( + "context" + "encoding/json" + "errors" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/gluster/glusterd2/glusterd2/transaction" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/pborman/uuid" +) + +// GlobalTxnManager stores and manages access to transaction related data +var GlobalTxnManager TxnManager + +const ( + // TxnStatusPrefix is etcd key prefix under which status of a txn is stored for a particular node + // eg.. key for storing status:- pending-transaction///status + TxnStatusPrefix = "status" + // LastExecutedStepPrefix is etcd key prefix under which last step executed on a particular node for a txn is stored + // eg.. key for storing last executed step on a node:- pending-transaction///laststep + LastExecutedStepPrefix = "laststep" + // etcd txn timeout in seconds + etcdTxnTimeout = time.Second * 5 +) + +// TxnManager stores and manages access to transaction related data in +// `pending-transaction` namespace. +type TxnManager interface { + WatchTxn(stopCh <-chan struct{}) <-chan *Txn + GetTxns() []*Txn + AddTxn(txn *Txn) error + GetTxnByUUID(id uuid.UUID) (*Txn, error) + RemoveTransaction(txnID uuid.UUID) error + UpdateLastExecutedStep(index int, txnID uuid.UUID, nodeID uuid.UUID) error + GetLastExecutedStep(txnID uuid.UUID, nodeID uuid.UUID) (int, error) + WatchLastExecutedStep(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan int + WatchFailedTxn(stopCh <-chan struct{}, nodeID uuid.UUID) <-chan *Txn + WatchTxnStatus(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan TxnStatus + GetTxnStatus(txnID uuid.UUID, nodeID uuid.UUID) (TxnStatus, error) + UpDateTxnStatus(state TxnStatus, txnID uuid.UUID, nodeIDs ...uuid.UUID) error + TxnGC(maxAge time.Duration) + RemoveFailedTxns() +} + +type txnManager struct { + sync.Mutex + getStoreKey func(...string) string + storeWatcher clientv3.Watcher +} + +// NewTxnManager returns a TxnManager +func NewTxnManager(storeWatcher clientv3.Watcher) TxnManager { + tm := &txnManager{ + storeWatcher: storeWatcher, + } + tm.getStoreKey = func(s ...string) string { + key := path.Join(PendingTxnPrefix, path.Join(s...)) + return key + } + return tm +} + +// RemoveTransaction removes a transaction from `pending-transaction namespace` +func (tm *txnManager) RemoveTransaction(txnID uuid.UUID) error { + _, err := store.Delete(context.TODO(), tm.getStoreKey(txnID.String()), clientv3.WithPrefix()) + return err +} + +// WatchTxnStatus watches status of txn on a particular node +func (tm *txnManager) WatchTxnStatus(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan TxnStatus { + var ( + txnStatusChan = make(chan TxnStatus, 10) + key = tm.getStoreKey(txnID.String(), nodeID.String(), TxnStatusPrefix) + ) + + respHandler := func(response clientv3.WatchResponse) { + for _, event := range response.Events { + txnStatus := TxnStatus{} + if err := json.Unmarshal(event.Kv.Value, &txnStatus); err != nil { + continue + } + if !txnStatus.State.Valid() { + continue + } + txnStatusChan <- txnStatus + } + } + + tm.watch(stopCh, key, respHandler, clientv3.WithFilterDelete()) + return txnStatusChan +} + +// WatchTxn watches for newly added txn to store +func (tm *txnManager) WatchTxn(stopCh <-chan struct{}) <-chan *Txn { + var ( + txnChan = make(chan *Txn, 10) + key = tm.getStoreKey() + opts = []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithFilterDelete()} + ) + + respHandler := func(response clientv3.WatchResponse) { + for _, txn := range tm.watchRespToTxns(response) { + txnChan <- txn + } + } + + tm.watch(stopCh, key, respHandler, opts...) + + return txnChan +} + +// WatchFailedTxn watches for a failed txn on a particular node +func (tm *txnManager) WatchFailedTxn(stopCh <-chan struct{}, nodeID uuid.UUID) <-chan *Txn { + var ( + txnChan = make(chan *Txn) + key = tm.getStoreKey() + ops = []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithFilterDelete()} + ) + + go func() { + resp, err := store.Get(context.TODO(), key, ops...) + if err != nil { + return + } + for _, kv := range resp.Kvs { + if txn := tm.kvToFailedTxn(kv, nodeID); txn != nil { + txnChan <- txn + } + } + }() + + respHandler := func(resp clientv3.WatchResponse) { + for _, event := range resp.Events { + if txn := tm.kvToFailedTxn(event.Kv, nodeID); txn != nil { + txnChan <- txn + } + } + } + + tm.watch(stopCh, key, respHandler, ops...) + return txnChan +} + +func (tm *txnManager) kvToFailedTxn(kv *mvccpb.KeyValue, nodeID uuid.UUID) *Txn { + + if !strings.HasSuffix(string(kv.Key), TxnStatusPrefix) { + return nil + } + + prefix, _ := path.Split(string(kv.Key)) + nID := path.Base(prefix) + + if nodeID.String() != nID { + return nil + } + + txnStatus := &TxnStatus{} + if err := json.Unmarshal(kv.Value, txnStatus); err != nil { + return nil + } + + if txnStatus.State != txnFailed { + return nil + } + + txn, err := tm.GetTxnByUUID(txnStatus.TxnID) + if err != nil { + return nil + } + return txn +} + +func (tm *txnManager) watchRespToTxns(resp clientv3.WatchResponse) (txns []*Txn) { + for _, event := range resp.Events { + prefix, id := path.Split(string(event.Kv.Key)) + if uuid.Parse(id) == nil || !strings.HasSuffix(prefix, PendingTxnPrefix) { + continue + } + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(event.Kv.Value, txn); err != nil { + continue + } + + txn.locks = make(map[string]*concurrency.Mutex) + txns = append(txns, txn) + } + return +} + +// AddTxn adds a txn to the store +func (tm *txnManager) AddTxn(txn *Txn) error { + data, err := json.Marshal(txn) + if err != nil { + return err + } + _, err = store.Put(context.TODO(), tm.getStoreKey(txn.ID.String()), string(data)) + return err +} + +// GetTxnByUUID returns the txn from given ID +func (tm *txnManager) GetTxnByUUID(id uuid.UUID) (*Txn, error) { + key := tm.getStoreKey(id.String()) + resp, err := store.Get(context.TODO(), key) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, errors.New(key + " key not found") + } + + kv := resp.Kvs[0] + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(kv.Value, txn); err != nil { + return nil, err + } + txn.locks = make(map[string]*concurrency.Mutex) + return txn, nil +} + +// GetTxns returns all txns added to the store +func (tm *txnManager) GetTxns() (txns []*Txn) { + resp, err := store.Get(context.TODO(), tm.getStoreKey(), clientv3.WithPrefix()) + if err != nil { + return + } + for _, kv := range resp.Kvs { + _, id := path.Split(string(kv.Key)) + if uuid.Parse(id) == nil { + continue + } + + txn := &Txn{Ctx: new(transaction.Tctx)} + if err := json.Unmarshal(kv.Value, txn); err != nil { + continue + } + txn.locks = make(map[string]*concurrency.Mutex) + txns = append(txns, txn) + } + return +} + +// UpDateTxnStatus updates txn status for given nodes +func (tm *txnManager) UpDateTxnStatus(status TxnStatus, txnID uuid.UUID, nodeIDs ...uuid.UUID) error { + var ( + ctx, cancel = context.WithTimeout(context.Background(), etcdTxnTimeout) + storeMutex = concurrency.NewMutex(store.Store.Session, txnID.String()) + putOps []clientv3.Op + ) + + storeMutex.Lock(ctx) + defer cancel() + defer storeMutex.Unlock(ctx) + + data, err := json.Marshal(status) + if err != nil { + return err + } + + for _, nodeID := range nodeIDs { + key := tm.getStoreKey(txnID.String(), nodeID.String(), TxnStatusPrefix) + putOps = append(putOps, clientv3.OpPut(key, string(data))) + } + + txn, err := store.Txn(ctx).Then(putOps...).Commit() + if err != nil || !txn.Succeeded { + return errors.New("etcd txn to update txn status failed") + } + return nil +} + +// GetTxnStatus returns status of given txn on a particular node +func (tm *txnManager) GetTxnStatus(txnID uuid.UUID, nodeID uuid.UUID) (TxnStatus, error) { + var ( + ctx, cancel = context.WithCancel(context.Background()) + key = tm.getStoreKey(txnID.String(), nodeID.String(), TxnStatusPrefix) + storeMutex = concurrency.NewMutex(store.Store.Session, txnID.String()) + ) + + storeMutex.Lock(ctx) + defer cancel() + defer storeMutex.Unlock(ctx) + + resp, err := store.Get(context.TODO(), key) + if err != nil { + return TxnStatus{State: txnUnknown}, err + } + + if len(resp.Kvs) == 0 { + return TxnStatus{State: txnUnknown}, errors.New(key + " key not found") + } + + txnStatus := TxnStatus{} + kv := resp.Kvs[0] + + if err := json.Unmarshal(kv.Value, &txnStatus); err != nil { + return TxnStatus{State: txnUnknown}, err + } + + if !txnStatus.State.Valid() { + return TxnStatus{State: txnUnknown}, errors.New("invalid txn state") + } + + return txnStatus, nil +} + +// UpdateLastExecutedStep updates the last executed step on a node of a given txn ID +func (tm *txnManager) UpdateLastExecutedStep(index int, txnID uuid.UUID, nodeID uuid.UUID) error { + key := tm.getStoreKey(txnID.String(), nodeID.String(), LastExecutedStepPrefix) + _, err := store.Put(context.TODO(), key, strconv.Itoa(index)) + return err +} + +// GetLastExecutedStep fetches the last executed step on a node for a given txn ID +func (tm *txnManager) GetLastExecutedStep(txnID uuid.UUID, nodeID uuid.UUID) (int, error) { + key := tm.getStoreKey(txnID.String(), nodeID.String(), LastExecutedStepPrefix) + resp, err := store.Get(context.TODO(), key) + if err != nil { + return -1, err + } + + if resp.Count != 1 { + return -1, errors.New("more than one entry for same key") + } + + kv := resp.Kvs[0] + return strconv.Atoi(string(kv.Value)) +} + +// WatchLastExecutedStep watches for last executed step on a node for a given txn ID +func (tm *txnManager) WatchLastExecutedStep(stopCh <-chan struct{}, txnID uuid.UUID, nodeID uuid.UUID) <-chan int { + var ( + lastExecutedStepChan = make(chan int) + key = tm.getStoreKey(txnID.String(), nodeID.String(), LastExecutedStepPrefix) + opts = []clientv3.OpOption{clientv3.WithFilterDelete()} + ) + + resp, err := store.Get(context.TODO(), key) + if err == nil && resp.Count == 1 { + opts = append(opts, clientv3.WithRev(resp.Kvs[0].CreateRevision)) + } + + respHandler := func(response clientv3.WatchResponse) { + for _, event := range response.Events { + lastStep := string(event.Kv.Value) + if i, err := strconv.Atoi(lastStep); err == nil { + lastExecutedStepChan <- i + } + } + } + + tm.watch(stopCh, key, respHandler, opts...) + return lastExecutedStepChan +} + +// TxnGC will mark all the expired txn as failed based on given maxAge +func (tm *txnManager) TxnGC(maxAge time.Duration) { + tm.Lock() + defer tm.Unlock() + + txns := tm.GetTxns() + for _, txn := range txns { + if txn.StartTime.Unix()+int64(maxAge/time.Second) < time.Now().Unix() { + nonFailedNodes := []uuid.UUID{} + for _, nodeID := range txn.Nodes { + txnStatus, err := tm.GetTxnStatus(txn.ID, nodeID) + if err == nil && txnStatus.State != txnFailed { + nonFailedNodes = append(nonFailedNodes, nodeID) + } + } + if len(nonFailedNodes) == 0 { + continue + } + txnStatus := TxnStatus{State: txnFailed, TxnID: txn.ID, Reason: "txn expired"} + txn.Ctx.Logger().Info("txn got expired marking as failure") + tm.UpDateTxnStatus(txnStatus, txn.ID, nonFailedNodes...) + } + } +} + +// RemoveFailedTxns will remove all failed txn if rollback is completed by all peers involved in the transaction. +func (tm *txnManager) RemoveFailedTxns() { + txns := tm.GetTxns() + + for _, txn := range txns { + nodesRollbacked := 0 + + for _, nodeID := range txn.Nodes { + txnStatus, err := tm.GetTxnStatus(txn.ID, nodeID) + if err == nil && txnStatus.State == txnFailed { + lastStep, err := tm.GetLastExecutedStep(txn.ID, nodeID) + if err == nil && lastStep == -1 { + nodesRollbacked++ + } + } + } + + if nodesRollbacked == len(txn.Nodes) { + txn.Ctx.Logger().Info("txn rolled back on all nodes, cleaning from store") + txn.done() + tm.RemoveTransaction(txn.ID) + } + } +} + +func (tm *txnManager) watch(stopCh <-chan struct{}, key string, respHandler func(clientv3.WatchResponse), opts ...clientv3.OpOption) { + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + watchRespChan := tm.storeWatcher.Watch(ctx, key, opts...) + for { + select { + case <-stopCh: + return + case watchResp := <-watchRespChan: + if watchResp.Err() != nil || watchResp.Canceled { + return + } + respHandler(watchResp) + + } + } + }() +} diff --git a/glusterd2/transactionv2/types.go b/glusterd2/transactionv2/types.go new file mode 100644 index 000000000..d03febfaf --- /dev/null +++ b/glusterd2/transactionv2/types.go @@ -0,0 +1,37 @@ +package transaction + +import "github.com/pborman/uuid" + +// TxnState represents state of a txn +type TxnState string + +const ( + txnPending TxnState = "Pending" + txnRunning TxnState = "Running" + txnSucceeded TxnState = "Succeeded" + txnFailed TxnState = "Failed" + txnUnknown TxnState = "Unknown" +) + +// Valid returns whether a TxnState is valid or not +func (ts TxnState) Valid() bool { + switch ts { + case txnPending: + fallthrough + case txnRunning: + fallthrough + case txnSucceeded: + fallthrough + case txnFailed: + return true + default: + return false + } +} + +// TxnStatus represents status of a txn +type TxnStatus struct { + State TxnState `json:"txn_state"` + TxnID uuid.UUID `json:"txn_id"` + Reason string `json:"reason,omitempty"` +} diff --git a/glusterd2/transactionv2/utils.go b/glusterd2/transactionv2/utils.go new file mode 100644 index 000000000..bb4d4a247 --- /dev/null +++ b/glusterd2/transactionv2/utils.go @@ -0,0 +1,108 @@ +package transaction + +import ( + "fmt" + "runtime" + "time" + + log "github.com/sirupsen/logrus" +) + +// NeverStop can be used in UntilStop to make it never stop +var NeverStop <-chan struct{} = make(chan struct{}) + +// UntilStop loops until stop channel is closed, running f every d duration +func UntilStop(f func(), d time.Duration, stop <-chan struct{}) { + var ( + t *time.Timer + timeout bool + ) + + for { + select { + case <-stop: + return + default: + } + func() { + defer HandlePanic() + f() + }() + t = ResetTimer(t, d, timeout) + select { + case <-stop: + return + case <-t.C: + timeout = true + } + } +} + +// ResetTimer avoids allocating a new timer if one is already in use +func ResetTimer(t *time.Timer, dur time.Duration, timeout bool) *time.Timer { + if t == nil { + return time.NewTimer(dur) + } + if !t.Stop() && !timeout { + <-t.C + } + t.Reset(dur) + return t +} + +// HandlePanic simply recovers from a panic and logs an error. +func HandlePanic() { + if r := recover(); r != nil { + callers := getCallers() + log.WithFields(log.Fields{ + "panic": r, + "callers": callers, + }).Error("recovered from panic") + } +} + +func getCallers() (callers string) { + for i := 0; true; i++ { + _, file, line, ok := runtime.Caller(i) + if !ok { + return + } + callers += fmt.Sprintf("%v:%v\n", file, line) + } + return +} + +// UntilSuccess runs conditionFunc every d duration, until it got succeeded or stop chan is closed +func UntilSuccess(conditionFunc func() bool, d time.Duration, stop <-chan struct{}) { + var ( + t *time.Timer + timeout bool + success bool + ) + + for { + select { + case <-stop: + return + default: + } + + func() { + defer HandlePanic() + success = conditionFunc() + }() + + if success { + return + } + + t = ResetTimer(t, d, timeout) + + select { + case <-stop: + return + case <-t.C: + timeout = true + } + } +}