Skip to content
This repository has been archived by the owner on Mar 26, 2020. It is now read-only.

Commit

Permalink
transaction: used etcd Election to elect cleanup leader
Browse files Browse the repository at this point in the history
Signed-off-by: Oshank Kumar <okumar@redhat.com>
  • Loading branch information
Oshank Kumar committed Nov 7, 2018
1 parent 607f08d commit 5181ab5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 103 deletions.
116 changes: 56 additions & 60 deletions glusterd2/transactionv2/cleanuphandler/cleanup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@ package cleanuphandler

import (
"context"
"fmt"
"sync"
"time"

"github.com/gluster/glusterd2/glusterd2/gdctx"
"github.com/gluster/glusterd2/glusterd2/store"
txn "github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/gluster/glusterd2/glusterd2/transactionv2"

"github.com/coreos/etcd/clientv3"
"github.com/pborman/uuid"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
)

const (
leaderKey = "leader"
cleanupTimerDur = time.Second * 5
txnMaxAge = time.Second * 20
electionTimerDur = time.Second * 10
leaderKey = "cleanup-leader"
cleanupTimerDur = time.Minute * 2
txnMaxAge = time.Second * 20
)

// CleanupLeader is responsible for performing all cleaning operation
Expand All @@ -37,23 +36,45 @@ type CleaupHandlerOptFunc func(handler *CleanupHandler) error
// by all peers involved in the transaction.
type CleanupHandler struct {
sync.Mutex
isLeader bool
locks txn.Locks
selfNodeID uuid.UUID
storeClient *clientv3.Client
stopChan chan struct{}
stopOnce sync.Once
txnManager transaction.TxnManager
isLeader bool
stopChan chan struct{}
stopOnce sync.Once
session *concurrency.Session
election *concurrency.Election
txnManager transaction.TxnManager
}

// WithSession configures a session with given ttl
func WithSession(client *clientv3.Client, ttl int) CleaupHandlerOptFunc {
return func(handler *CleanupHandler) error {
session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl))
if err != nil {
return err
}
handler.session = session
return nil
}
}

// WithElection creates a new election for CleanupHandler.It will use the `defaultSession`
// if no session has been configured previously.
func WithElection(defaultSession *concurrency.Session) CleaupHandlerOptFunc {
return func(handler *CleanupHandler) error {
session := defaultSession
if handler.session != nil {
session = handler.session
}
electionKeyPrefix := fmt.Sprintf("gluster-%s/", gdctx.MyClusterID.String()) + leaderKey
handler.election = concurrency.NewElection(session, electionKeyPrefix)
return nil
}
}

// NewCleanupHandler returns a new CleanupHandler
func NewCleanupHandler(optFuncs ...CleaupHandlerOptFunc) (*CleanupHandler, error) {
cl := &CleanupHandler{
storeClient: store.Store.Client,
stopChan: make(chan struct{}),
txnManager: transaction.NewTxnManager(store.Store.Watcher),
selfNodeID: gdctx.MyUUID,
locks: make(txn.Locks),
stopChan: make(chan struct{}),
txnManager: transaction.NewTxnManager(store.Store.Watcher),
}

for _, optFunc := range optFuncs {
Expand Down Expand Up @@ -85,7 +106,6 @@ func (c *CleanupHandler) HandleStaleTxn() {
if isLeader {
c.txnManager.TxnGC(txnMaxAge)
}

}

// CleanFailedTxn removes all failed txn if rollback is
Expand All @@ -100,64 +120,40 @@ func (c *CleanupHandler) CleanFailedTxn() {
}
}

// Stop will stop running the CleanupHandler
func (c *CleanupHandler) Stop() {
log.Info("attempting to stop cleanup handler")
c.stopOnce.Do(func() {
close(c.stopChan)
})
c.Lock()
defer c.Unlock()
isLeader := c.isLeader
if isLeader {
store.Delete(context.TODO(), leaderKey)
}
c.locks.UnLock(context.Background())
}

// StartElecting triggers a new election after every `electionTimerDur`.
// StartElecting triggers a new election campaign.
// If it succeeded then it assumes the leader role and returns
func (c *CleanupHandler) StartElecting() {
log.Info("node started to contest for leader election")

transaction.UntilSuccess(c.IsNodeElected, electionTimerDur, c.stopChan)
if err := c.election.Campaign(context.Background(), gdctx.MyUUID.String()); err != nil {
log.WithError(err).Error("failed in campaign for cleanup leader election")
c.Stop()
return
}

log.Info("node got elected as cleanup leader")
c.Lock()
defer c.Unlock()
c.isLeader = true
}

// IsNodeElected returns whether a node is elected as a leader or not.
// Leader attempts to set a common key using a transaction that checks
// if the key already exists. If not, the candidate leader sets the
// key with a lease and assumes the leader role.
func (c *CleanupHandler) IsNodeElected() bool {
var (
leaseID = store.Store.Session.Lease()
lockID = gdctx.MyClusterID.String()
logger = log.WithField("lockID", lockID)
)

if err := c.locks.Lock(lockID); err != nil {
logger.WithError(err).Error("error in acquiring lock")
return false
}
defer c.locks.UnLock(context.Background())

resp, err := store.Txn(context.Background()).
If(clientv3.Compare(clientv3.Version(leaderKey), "=", 0)).
Then(clientv3.OpPut(leaderKey, c.selfNodeID.String(), clientv3.WithLease(leaseID))).
Commit()

return (err == nil) && resp.Succeeded
// Stop will stop running the CleanupHandler
func (c *CleanupHandler) Stop() {
log.Info("attempting to stop cleanup handler")
c.stopOnce.Do(func() {
close(c.stopChan)
c.election.Resign(context.Background())
})
}

// StartCleanupLeader starts cleanup leader
func StartCleanupLeader() {
var err error

CleanupLeader, err = NewCleanupHandler()
CleanupLeader, err = NewCleanupHandler(
WithSession(store.Store.Client, 60),
WithElection(store.Store.Session),
)

if err != nil {
log.WithError(err).Errorf("failed in starting cleanup handler")
Expand Down
23 changes: 15 additions & 8 deletions glusterd2/transactionv2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package transaction
import (
"context"
"errors"
"github.com/gluster/glusterd2/glusterd2/store"
"sync"
"time"

"github.com/gluster/glusterd2/glusterd2/gdctx"
"github.com/gluster/glusterd2/glusterd2/store"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func (txnEng *Engine) HandleTransaction() {
if !ok {
return
}
txn.Ctx.Logger().Debugf("received a pending txn")
txn.Ctx.Logger().Info("received a pending txn")
go txnEng.Execute(context.Background(), txn)
}
}
Expand Down Expand Up @@ -124,29 +125,34 @@ func (txnEng *Engine) executePendingTxn(ctx context.Context, txn *Txn) error {
stopch = make(chan struct{})
txnStatusChan = txnEng.txnManager.WatchTxnStatus(stopch, txn.ID, txnEng.selfNodeID)
updateOnce = &sync.Once{}
logger = txn.Ctx.Logger()
)
defer close(stopch)

txn.Ctx.Logger().Infof("transaction started on node")
logger.Infof("transaction started on node")

for i, step := range txn.Steps {
logger.WithField("stepname", step.DoFunc).Debug("running step func on node")

// a synchronized step is executed only after all pervious steps
// have been completed successfully by all involved peers.
if step.Sync {
txn.Ctx.Logger().WithField("stepname", step.DoFunc).Debug("synchronizing txn step")
logger.WithField("stepname", step.DoFunc).Debug("synchronizing txn step")
if err := txnEng.stepManager.SyncStep(ctx, i, txn); err != nil {
txn.Ctx.Logger().WithFields(log.Fields{
logger.WithFields(log.Fields{
"error": err,
"stepname": step.DoFunc,
}).Error("encounter an error in synchronizing txn step")
return err
}
txn.Ctx.Logger().Debug("transaction got synchronized")
logger.Debug("transaction got synchronized")
}

if err := txnEng.stepManager.RunStep(ctx, step, txn.Ctx); err != nil {
txn.Ctx.Logger().WithFields(log.Fields{
logger.WithFields(log.Fields{
"error": err,
"stepname": step.DoFunc,
}).Error("failed in executing step ")
}).Error("failed in executing txn step ")
txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID)
return err
}
Expand All @@ -163,6 +169,7 @@ func (txnEng *Engine) executePendingTxn(ctx context.Context, txn *Txn) error {
default:
}

logger.WithField("stepname", step.DoFunc).Debug("step func executed successfully on node")
txnEng.txnManager.UpdateLastExecutedStep(i, txn.ID, txnEng.selfNodeID)
updateOnce.Do(func() {
txnEng.txnManager.UpDateTxnStatus(TxnStatus{State: txnRunning, TxnID: txn.ID}, txn.ID, txnEng.selfNodeID)
Expand Down
4 changes: 4 additions & 0 deletions glusterd2/transactionv2/steprunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ func (sm *stepManager) shouldRunStep(step *transaction.Step) bool {
return false
}

// runStep synchronises the locally cached keys and values from the store
// before running the step function on node
func (sm *stepManager) runStep(ctx context.Context, stepName string, txnCtx transaction.TxnCtx) error {
txnCtx.SyncCache()
return transaction.RunStepFuncLocally(ctx, stepName, txnCtx)
}

// isPrevStepsExecutedOnNode reports that all pervious steps
// have been completed successfully on a given node
func (sm *stepManager) isPrevStepsExecutedOnNode(ctx context.Context, syncStepIndex int, nodeID uuid.UUID, txnID uuid.UUID, success chan<- struct{}) {
txnManager := NewTxnManager(store.Store.Watcher)
lastStepWatchChan := txnManager.WatchLastExecutedStep(ctx.Done(), txnID, nodeID)
Expand Down
35 changes: 0 additions & 35 deletions glusterd2/transactionv2/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,38 +71,3 @@ func getCallers() (callers string) {
}
return
}

// UntilSuccess runs conditionFunc every d duration, until it got succeeded or stop chan is closed
func UntilSuccess(conditionFunc func() bool, d time.Duration, stop <-chan struct{}) {
var (
t *time.Timer
timeout bool
success bool
)

for {
select {
case <-stop:
return
default:
}

func() {
defer HandlePanic()
success = conditionFunc()
}()

if success {
return
}

t = ResetTimer(t, d, timeout)

select {
case <-stop:
return
case <-t.C:
timeout = true
}
}
}

0 comments on commit 5181ab5

Please sign in to comment.