From 526cb048eed1a34164bdea08e2469b791555f089 Mon Sep 17 00:00:00 2001 From: Oshank Kumar Date: Wed, 31 Oct 2018 19:16:31 +0530 Subject: [PATCH] transaction: used etcd Election to elect cleanup leader Signed-off-by: Oshank Kumar --- .../cleanuphandler/cleanup_handler.go | 116 +++++++++--------- glusterd2/transactionv2/engine.go | 23 ++-- glusterd2/transactionv2/steprunner.go | 4 + 3 files changed, 75 insertions(+), 68 deletions(-) diff --git a/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go b/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go index cb86a67f3..ecff20ccb 100644 --- a/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go +++ b/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go @@ -2,24 +2,23 @@ package cleanuphandler import ( "context" + "fmt" "sync" "time" "github.com/gluster/glusterd2/glusterd2/gdctx" "github.com/gluster/glusterd2/glusterd2/store" - txn "github.com/gluster/glusterd2/glusterd2/transaction" "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/coreos/etcd/clientv3" - "github.com/pborman/uuid" + "github.com/coreos/etcd/clientv3/concurrency" log "github.com/sirupsen/logrus" ) const ( - leaderKey = "leader" - cleanupTimerDur = time.Second * 5 - txnMaxAge = time.Second * 20 - electionTimerDur = time.Second * 10 + leaderKey = "cleanup-leader" + cleanupTimerDur = time.Minute * 2 + txnMaxAge = time.Second * 20 ) // CleanupLeader is responsible for performing all cleaning operation @@ -37,23 +36,45 @@ type CleaupHandlerOptFunc func(handler *CleanupHandler) error // by all peers involved in the transaction. type CleanupHandler struct { sync.Mutex - isLeader bool - locks txn.Locks - selfNodeID uuid.UUID - storeClient *clientv3.Client - stopChan chan struct{} - stopOnce sync.Once - txnManager transaction.TxnManager + isLeader bool + stopChan chan struct{} + stopOnce sync.Once + session *concurrency.Session + election *concurrency.Election + txnManager transaction.TxnManager +} + +// WithSession configures a session with given ttl +func WithSession(client *clientv3.Client, ttl int) CleaupHandlerOptFunc { + return func(handler *CleanupHandler) error { + session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl)) + if err != nil { + return err + } + handler.session = session + return nil + } +} + +// WithElection creates a new election for CleanupHandler.It will use the `defaultSession` +// if no session has been configured previously. +func WithElection(defaultSession *concurrency.Session) CleaupHandlerOptFunc { + return func(handler *CleanupHandler) error { + session := defaultSession + if handler.session != nil { + session = handler.session + } + electionKeyPrefix := fmt.Sprintf("gluster-%s/", gdctx.MyClusterID.String()) + leaderKey + handler.election = concurrency.NewElection(session, electionKeyPrefix) + return nil + } } // NewCleanupHandler returns a new CleanupHandler func NewCleanupHandler(optFuncs ...CleaupHandlerOptFunc) (*CleanupHandler, error) { cl := &CleanupHandler{ - storeClient: store.Store.Client, - stopChan: make(chan struct{}), - txnManager: transaction.NewTxnManager(store.Store.Watcher), - selfNodeID: gdctx.MyUUID, - locks: make(txn.Locks), + stopChan: make(chan struct{}), + txnManager: transaction.NewTxnManager(store.Store.Watcher), } for _, optFunc := range optFuncs { @@ -85,7 +106,6 @@ func (c *CleanupHandler) HandleStaleTxn() { if isLeader { c.txnManager.TxnGC(txnMaxAge) } - } // CleanFailedTxn removes all failed txn if rollback is @@ -100,27 +120,16 @@ func (c *CleanupHandler) CleanFailedTxn() { } } -// Stop will stop running the CleanupHandler -func (c *CleanupHandler) Stop() { - log.Info("attempting to stop cleanup handler") - c.stopOnce.Do(func() { - close(c.stopChan) - }) - c.Lock() - defer c.Unlock() - isLeader := c.isLeader - if isLeader { - store.Delete(context.TODO(), leaderKey) - } - c.locks.UnLock(context.Background()) -} - -// StartElecting triggers a new election after every `electionTimerDur`. +// StartElecting triggers a new election campaign. // If it succeeded then it assumes the leader role and returns func (c *CleanupHandler) StartElecting() { log.Info("node started to contest for leader election") - transaction.UntilSuccess(c.IsNodeElected, electionTimerDur, c.stopChan) + if err := c.election.Campaign(context.Background(), gdctx.MyUUID.String()); err != nil { + log.WithError(err).Error("failed in campaign for cleanup leader election") + c.Stop() + return + } log.Info("node got elected as cleanup leader") c.Lock() @@ -128,36 +137,23 @@ func (c *CleanupHandler) StartElecting() { c.isLeader = true } -// IsNodeElected returns whether a node is elected as a leader or not. -// Leader attempts to set a common key using a transaction that checks -// if the key already exists. If not, the candidate leader sets the -// key with a lease and assumes the leader role. -func (c *CleanupHandler) IsNodeElected() bool { - var ( - leaseID = store.Store.Session.Lease() - lockID = gdctx.MyClusterID.String() - logger = log.WithField("lockID", lockID) - ) - - if err := c.locks.Lock(lockID); err != nil { - logger.WithError(err).Error("error in acquiring lock") - return false - } - defer c.locks.UnLock(context.Background()) - - resp, err := store.Txn(context.Background()). - If(clientv3.Compare(clientv3.Version(leaderKey), "=", 0)). - Then(clientv3.OpPut(leaderKey, c.selfNodeID.String(), clientv3.WithLease(leaseID))). - Commit() - - return (err == nil) && resp.Succeeded +// Stop will stop running the CleanupHandler +func (c *CleanupHandler) Stop() { + log.Info("attempting to stop cleanup handler") + c.stopOnce.Do(func() { + close(c.stopChan) + c.election.Resign(context.Background()) + }) } // StartCleanupLeader starts cleanup leader func StartCleanupLeader() { var err error - CleanupLeader, err = NewCleanupHandler() + CleanupLeader, err = NewCleanupHandler( + WithSession(store.Store.Client, 60), + WithElection(store.Store.Session), + ) if err != nil { log.WithError(err).Errorf("failed in starting cleanup handler") diff --git a/glusterd2/transactionv2/engine.go b/glusterd2/transactionv2/engine.go index 38d2720a6..d80f68759 100644 --- a/glusterd2/transactionv2/engine.go +++ b/glusterd2/transactionv2/engine.go @@ -3,11 +3,12 @@ package transaction import ( "context" "errors" - "github.com/gluster/glusterd2/glusterd2/store" "sync" "time" "github.com/gluster/glusterd2/glusterd2/gdctx" + "github.com/gluster/glusterd2/glusterd2/store" + "github.com/pborman/uuid" log "github.com/sirupsen/logrus" ) @@ -66,7 +67,7 @@ func (txnEng *Engine) HandleTransaction() { if !ok { return } - txn.Ctx.Logger().Debugf("received a pending txn") + txn.Ctx.Logger().Info("received a pending txn") go txnEng.Execute(context.Background(), txn) } } @@ -124,29 +125,34 @@ func (txnEng *Engine) executePendingTxn(ctx context.Context, txn *Txn) error { stopch = make(chan struct{}) txnStatusChan = txnEng.txnManager.WatchTxnStatus(stopch, txn.ID, txnEng.selfNodeID) updateOnce = &sync.Once{} + logger = txn.Ctx.Logger() ) defer close(stopch) - txn.Ctx.Logger().Infof("transaction started on node") + logger.Infof("transaction started on node") for i, step := range txn.Steps { + logger.WithField("stepname", step.DoFunc).Debug("running step func on node") + + // a synchronized step is executed only after all pervious steps + // have been completed successfully by all involved peers. if step.Sync { - txn.Ctx.Logger().WithField("stepname", step.DoFunc).Debug("synchronizing txn step") + logger.WithField("stepname", step.DoFunc).Debug("synchronizing txn step") if err := txnEng.stepManager.SyncStep(ctx, i, txn); err != nil { - txn.Ctx.Logger().WithFields(log.Fields{ + logger.WithFields(log.Fields{ "error": err, "stepname": step.DoFunc, }).Error("encounter an error in synchronizing txn step") return err } - txn.Ctx.Logger().Debug("transaction got synchronized") + logger.Debug("transaction got synchronized") } if err := txnEng.stepManager.RunStep(ctx, step, txn.Ctx); err != nil { - txn.Ctx.Logger().WithFields(log.Fields{ + logger.WithFields(log.Fields{ "error": err, "stepname": step.DoFunc, - }).Error("failed in executing step ") + }).Error("failed in executing txn step ") txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) return err } @@ -163,6 +169,7 @@ func (txnEng *Engine) executePendingTxn(ctx context.Context, txn *Txn) error { default: } + logger.WithField("stepname", step.DoFunc).Debug("step func executed successfully on node") txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID) updateOnce.Do(func() { txnEng.txnManager.UpDateTxnStatus(TxnStatus{State: txnRunning, TxnID: txn.ID}, txn.ID, txnEng.selfNodeID) diff --git a/glusterd2/transactionv2/steprunner.go b/glusterd2/transactionv2/steprunner.go index 46002c920..d1f3280b3 100644 --- a/glusterd2/transactionv2/steprunner.go +++ b/glusterd2/transactionv2/steprunner.go @@ -40,11 +40,15 @@ func (sm *stepManager) shouldRunStep(step *transaction.Step) bool { return false } +// runStep synchronises the locally cached keys and values from the store +// before running the step function on node func (sm *stepManager) runStep(ctx context.Context, stepName string, txnCtx transaction.TxnCtx) error { txnCtx.SyncCache() return transaction.RunStepFuncLocally(ctx, stepName, txnCtx) } +// isPrevStepsExecutedOnNode reports that all pervious steps +// have been completed successfully on a given node func (sm *stepManager) isPrevStepsExecutedOnNode(ctx context.Context, syncStepIndex int, nodeID uuid.UUID, txnID uuid.UUID, success chan<- struct{}) { txnManager := NewTxnManager(store.Store.Watcher) lastStepWatchChan := txnManager.WatchLastExecutedStep(ctx.Done(), txnID, nodeID)