Skip to content
This repository has been archived by the owner on Mar 26, 2020. It is now read-only.

Commit

Permalink
transaction: Implemented Txn Engine
Browse files Browse the repository at this point in the history
The transaction engine executes the given transaction
across the cluster.The engine is designed to make use
of etcd as the means of communication between peers.

Please refer Design Document #1003

cleanup leader: added a cleanup leader which will perform
all cleaning operation.

A leader is elected among the peers in the cluster to
cleanup stale transactions. The leader periodically
scans the pending transaction namespace for failed
and stale transactions, and cleans them up if rollback
is completed by all peers involved in the transaction.

Signed-off-by: Oshank Kumar <okumar@redhat.com>
  • Loading branch information
Oshank Kumar committed Nov 7, 2018
1 parent 4cf71aa commit 607f08d
Show file tree
Hide file tree
Showing 16 changed files with 1,425 additions and 39 deletions.
9 changes: 7 additions & 2 deletions glusterd2/commands/peers/peer-rpc-svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"github.com/gluster/glusterd2/glusterd2/peer"
"github.com/gluster/glusterd2/glusterd2/servers/peerrpc"
"github.com/gluster/glusterd2/glusterd2/store"
"github.com/gluster/glusterd2/glusterd2/transactionv2"
"github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/utils"
"github.com/pborman/uuid"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -169,6 +171,8 @@ func ReconfigureStore(c *StoreConfig) error {

// Stop events framework
events.Stop()
transaction.StopTxnEngine()
cleanuphandler.StopCleanupLeader()

// do not delete cluster namespace if this is not a loner node
var deleteNamespace bool
Expand Down Expand Up @@ -217,7 +221,8 @@ func ReconfigureStore(c *StoreConfig) error {

// Now that new store is up, start events framework
events.Start()

transaction.StartTxnEngine()
cleanuphandler.StartCleanupLeader()
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion glusterd2/commands/volumes/volume-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gluster/glusterd2/glusterd2/gdctx"
restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils"
"github.com/gluster/glusterd2/glusterd2/transaction"
transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/api"
gderrors "github.com/gluster/glusterd2/pkg/errors"
Expand Down Expand Up @@ -154,7 +155,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
return
}

txn, err := transaction.NewTxnWithLocks(ctx, req.Name)
txn, err := transactionv2.NewTxnWithLocks(ctx, req.Name)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
Expand All @@ -175,6 +176,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
},
{
DoFunc: "vol-create.ValidateBricks",
Sync: true,
Nodes: nodes,
},
{
Expand Down
6 changes: 6 additions & 0 deletions glusterd2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/gluster/glusterd2/glusterd2/pmap"
"github.com/gluster/glusterd2/glusterd2/servers"
"github.com/gluster/glusterd2/glusterd2/store"
"github.com/gluster/glusterd2/glusterd2/transactionv2"
"github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler"
gdutils "github.com/gluster/glusterd2/glusterd2/utils"
"github.com/gluster/glusterd2/glusterd2/xlator"
"github.com/gluster/glusterd2/pkg/errors"
Expand Down Expand Up @@ -103,6 +105,8 @@ func main() {
log.WithError(err).Fatal("Failed to initialize store (etcd client)")
}

transaction.StartTxnEngine()
cleanuphandler.StartCleanupLeader()
// Start the events framework after store is up
if err := events.Start(); err != nil {
log.WithError(err).Fatal("Failed to start internal events framework")
Expand Down Expand Up @@ -156,6 +160,8 @@ func main() {
case unix.SIGINT:
log.Info("Received SIGTERM. Stopping GlusterD")
gdctx.IsTerminating = true
transaction.StopTxnEngine()
cleanuphandler.StopCleanupLeader()
super.Stop()
events.Stop()
store.Close()
Expand Down
45 changes: 31 additions & 14 deletions glusterd2/transaction/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,32 @@ type TxnCtx interface {
// Logger returns the Logrus logger associated with the context
Logger() log.FieldLogger

// commit writes all locally cached keys and values into the store using
// Commit writes all locally cached keys and values into the store using
// a single etcd transaction. This is for internal use by the txn framework
// and hence isn't exported.
commit() error
Commit() error

// SyncCache synchronizes the locally cached keys and values from the store
SyncCache() error
}

// Tctx represents structure for transaction context
type Tctx struct {
config *txnCtxConfig // this will be marshalled and sent on wire
config *TxnCtxConfig // this will be marshalled and sent on wire
logger log.FieldLogger
readSet map[string][]byte // cached responses from store
readCacheDirty bool
writeSet map[string]string // to be written to store
}

// txnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx
// TxnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx
// on receiver's end.
type txnCtxConfig struct {
type TxnCtxConfig struct {
LogFields log.Fields
StorePrefix string
}

func newCtx(config *txnCtxConfig) *Tctx {
func newCtx(config *TxnCtxConfig) *Tctx {
return &Tctx{
config: config,
logger: log.StandardLogger().WithFields(config.LogFields),
Expand All @@ -65,6 +68,11 @@ func newCtx(config *txnCtxConfig) *Tctx {
}
}

// NewCtx returns a transaction context from given config
func NewCtx(config *TxnCtxConfig) *Tctx {
return newCtx(config)
}

// Set attaches the given key-value pair to the context.
// If the key exists, the value will be updated.
func (c *Tctx) Set(key string, value interface{}) error {
Expand All @@ -86,9 +94,22 @@ func (c *Tctx) Set(key string, value interface{}) error {
return nil
}

// commit writes all locally cached keys and values into the store using
// SyncCache synchronizes the locally cached keys and values from the store
func (c *Tctx) SyncCache() error {
resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix())
if err != nil {
return err
}

for _, kv := range resp.Kvs {
c.readSet[string(kv.Key)] = kv.Value
}
return nil
}

// Commit writes all locally cached keys and values into the store using
// a single etcd transaction.
func (c *Tctx) commit() error {
func (c *Tctx) Commit() error {

if len(c.writeSet) == 0 {
return nil
Expand Down Expand Up @@ -120,6 +141,7 @@ func (c *Tctx) commit() error {

expTxn.Add("txn_ctx_store_commit", 1)

c.writeSet = make(map[string]string)
c.readCacheDirty = true

return nil
Expand All @@ -139,15 +161,10 @@ func (c *Tctx) Get(key string, value interface{}) error {

// cache all keys and values from the store on the first call to Get
if c.readCacheDirty {
resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix())
if err != nil {
if err := c.SyncCache(); err != nil {
c.logger.WithError(err).WithField("key", key).Error("failed to get key from transaction context")
return err
}
expTxn.Add("txn_ctx_store_get", 1)
for _, kv := range resp.Kvs {
c.readSet[string(kv.Key)] = kv.Value
}
c.readCacheDirty = false
}

Expand Down
34 changes: 20 additions & 14 deletions glusterd2/transaction/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func CreateLockSteps(key string) (*Step, *Step, error) {
return nil, nil, err
}

lockStep := &Step{lockFunc, unlockFunc, []uuid.UUID{gdctx.MyUUID}, false}
unlockStep := &Step{unlockFunc, "", []uuid.UUID{gdctx.MyUUID}, false}
lockStep := &Step{DoFunc: lockFunc, UndoFunc: unlockFunc, Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false}
unlockStep := &Step{DoFunc: unlockFunc, UndoFunc: "", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false}

return lockStep, unlockStep, nil
}
Expand Down Expand Up @@ -149,15 +149,15 @@ func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) {
return lockFunc, unlockFunc
}

func (t *Txn) lock(lockID string) error {
// Locks are the collection of cluster wide transaction lock
type Locks map[string]*concurrency.Mutex

func (l Locks) lock(lockID string) error {
// Ensure that no prior lock exists for the given lockID in this transaction
if _, ok := t.locks[lockID]; ok {
if _, ok := l[lockID]; ok {
return ErrLockExists
}

logger := t.Ctx.Logger().WithField("lockID", lockID)
logger.Debug("attempting to obtain lock")

key := lockPrefix + lockID
locker := concurrency.NewMutex(store.Store.Session, key)

Expand All @@ -167,32 +167,38 @@ func (t *Txn) lock(lockID string) error {
err := locker.Lock(ctx)
switch err {
case nil:
logger.Debug("lock obtained")
// Attach lock to the transaction
t.locks[lockID] = locker
l[lockID] = locker

case context.DeadlineExceeded:
// Propagate this all the way back to the client as a HTTP 409 response
logger.Debug("timeout: failed to obtain lock")
err = ErrLockTimeout

default:
logger.WithError(err).Error("failed to obtain lock")
}

return err
}

// Lock obtains a cluster wide transaction lock on the given lockID/lockIDs,
// and attaches the obtained locks to the transaction
func (t *Txn) Lock(lockID string, lockIDs ...string) error {
if err := t.lock(lockID); err != nil {
func (l Locks) Lock(lockID string, lockIDs ...string) error {
if err := l.lock(lockID); err != nil {
return err
}
for _, id := range lockIDs {
if err := t.lock(id); err != nil {
if err := l.lock(id); err != nil {
return err
}
}
return nil
}

// UnLock releases all cluster wide obtained locks
func (l Locks) UnLock(ctx context.Context) {
for lockID, locker := range l {
if err := locker.Unlock(ctx); err == nil {
delete(l, lockID)
}
}
}
2 changes: 1 addition & 1 deletion glusterd2/transaction/rpc-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *txnSvc) RunStep(rpcCtx context.Context, req *TxnStepReq) (*TxnStepResp,
goto End
}

if err = ctx.commit(); err != nil {
if err = ctx.Commit(); err != nil {
logger.WithError(err).Error("failed to commit txn context to store")
}

Expand Down
8 changes: 5 additions & 3 deletions glusterd2/transaction/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Step struct {
UndoFunc string
Nodes []uuid.UUID
Skip bool
Sync bool
}

var (
Expand Down Expand Up @@ -136,7 +137,7 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod

var err error
if uuid.Equal(node, gdctx.MyUUID) {
err = runStepFuncLocally(origCtx, stepName, ctx)
err = RunStepFuncLocally(origCtx, stepName, ctx)
} else {
// remote node
err = runStepOn(origCtx, stepName, node, ctx)
Expand All @@ -145,7 +146,8 @@ func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, nod
respCh <- stepPeerResp{node, err}
}

func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error {
// RunStepFuncLocally runs a step func on local node
func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error {

var err error

Expand All @@ -163,7 +165,7 @@ func runStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) er
if err = stepFunc(ctx); err == nil {
// if step function executes successfully, commit the
// results to the store
err = ctx.commit()
err = ctx.Commit()
}
} else {
err = ErrStepFuncNotFound
Expand Down
14 changes: 10 additions & 4 deletions glusterd2/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var expTxn = expvar.NewMap("txn")
// Txn is a set of steps
type Txn struct {
id uuid.UUID
locks map[string]*concurrency.Mutex
locks Locks
reqID uuid.UUID
storePrefix string

Expand All @@ -49,7 +49,7 @@ func NewTxn(ctx context.Context) *Txn {
t.reqID = gdctx.GetReqID(ctx)
t.locks = make(map[string]*concurrency.Mutex)
t.storePrefix = txnPrefix + t.id.String() + "/"
config := &txnCtxConfig{
config := &TxnCtxConfig{
LogFields: log.Fields{
"txnid": t.id.String(),
"reqid": t.reqID.String(),
Expand All @@ -68,10 +68,16 @@ func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) {
t := NewTxn(ctx)

for _, id := range lockIDs {
if err := t.Lock(id); err != nil {
logger := t.Ctx.Logger().WithField("lockID", id)
logger.Debug("attempting to obtain lock")

if err := t.locks.Lock(id); err != nil {
logger.WithError(err).Error("failed to obtain lock")
t.Done()
return nil, err
}

logger.Debug("lock obtained")
}

return t, nil
Expand Down Expand Up @@ -125,7 +131,7 @@ func (t *Txn) Do() error {
expTxn.Add("initiated_txn_in_progress", 1)

// commit txn.Ctx.Set()s done in REST handlers to the store
if err := t.Ctx.commit(); err != nil {
if err := t.Ctx.Commit(); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 607f08d

Please sign in to comment.