Skip to content
This repository has been archived by the owner on Mar 26, 2020. It is now read-only.

transaction: Resilient Txn Engine #1268

Merged
merged 7 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions glusterd2/commands/peers/peer-rpc-svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"github.com/gluster/glusterd2/glusterd2/peer"
"github.com/gluster/glusterd2/glusterd2/servers/peerrpc"
"github.com/gluster/glusterd2/glusterd2/store"
"github.com/gluster/glusterd2/glusterd2/transactionv2"
"github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/utils"
"github.com/pborman/uuid"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -169,6 +171,8 @@ func ReconfigureStore(c *StoreConfig) error {

// Stop events framework
events.Stop()
transaction.StopTxnEngine()
cleanuphandler.StopCleanupLeader()

// do not delete cluster namespace if this is not a loner node
var deleteNamespace bool
Expand Down Expand Up @@ -217,7 +221,8 @@ func ReconfigureStore(c *StoreConfig) error {

// Now that new store is up, start events framework
events.Start()

transaction.StartTxnEngine()
cleanuphandler.StartCleanupLeader()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/snapshot/snapshot-activate.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func snapshotActivateHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "snap-activate.StoreSnapshot",
UndoFunc: "snap-activate.UndoStoreSnapshot",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
}
if err = txn.Do(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions glusterd2/commands/snapshot/snapshot-clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func snapshotCloneHandler(w http.ResponseWriter, r *http.Request) {
{
DoFunc: "snap-clone.CreateCloneVolinfo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "snap-clone.TakeBrickSnapshots",
Expand All @@ -332,6 +333,7 @@ func snapshotCloneHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "snap-clone.StoreSnapshot",
UndoFunc: "snap-clone.UndoStoreSnapshotOnClone",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
}
if err = txn.Ctx.Set("snapname", &snapname); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions glusterd2/commands/snapshot/snapshot-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ func snapshotCreateHandler(w http.ResponseWriter, r *http.Request) {
{
DoFunc: "snap-create.CreateSnapinfo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "snap-create.ActivateBarrier",
Expand All @@ -774,6 +775,8 @@ func snapshotCreateHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "snap-create.TakeBrickSnapshots",
UndoFunc: "snap-create.UndoBrickSnapshots",
Nodes: txn.Nodes,
// All bricks need to be barriered before taking a snapshot
Sync: true,
},
{
DoFunc: "snap-create.DeactivateBarrier",
Expand All @@ -784,6 +787,7 @@ func snapshotCreateHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "snap-create.StoreSnapshot",
UndoFunc: "snap-create.UndoStoreSnapshotOnCreate",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
}

Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/snapshot/snapshot-deactivate.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func snapshotDeactivateHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "snap-deactivate.StoreSnapshot",
UndoFunc: "snap-deactivate.UndoStoreSnapshot",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
}
if err = txn.Ctx.Set("oldsnapinfo", &snapinfo); err != nil {
Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/snapshot/snapshot-delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func snapshotDeleteHandler(w http.ResponseWriter, r *http.Request) {
{
DoFunc: "snap-delete.Store",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
}

Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/snapshot/snapshot-restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func snapshotRestoreHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "snap-restore.Store",
UndoFunc: "snap-restore.UndoStore",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "snap-restore.CleanBricks",
Expand Down
2 changes: 2 additions & 0 deletions glusterd2/commands/volumes/brick-replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ LOOP:
{
DoFunc: "brick-replace.ReplaceVolinfo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "vol-create.InitBricks",
Expand All @@ -175,6 +176,7 @@ LOOP:
DoFunc: "vol-create.StoreVolume",
UndoFunc: "vol-create.UndoStoreVolume",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "vol-expand.NotifyClients",
Expand Down
6 changes: 5 additions & 1 deletion glusterd2/commands/volumes/volume-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gluster/glusterd2/glusterd2/gdctx"
restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils"
"github.com/gluster/glusterd2/glusterd2/transaction"
transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/api"
gderrors "github.com/gluster/glusterd2/pkg/errors"
Expand Down Expand Up @@ -174,7 +175,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
return
}

txn, err := transaction.NewTxnWithLocks(ctx, req.Name)
txn, err := transactionv2.NewTxnWithLocks(ctx, req.Name)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
Expand All @@ -196,6 +197,8 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
{
DoFunc: "vol-create.ValidateBricks",
Nodes: nodes,
// Need to wait for volinfo to be created first
Sync: true,
},
{
DoFunc: "vol-create.InitBricks",
Expand All @@ -206,6 +209,7 @@ func volumeCreateHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "vol-create.StoreVolume",
UndoFunc: "vol-create.UndoStoreVolume",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
}

Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/volumes/volume-delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func volumeDeleteHandler(w http.ResponseWriter, r *http.Request) {
{
DoFunc: "vol-delete.Store",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
}

Expand Down
4 changes: 4 additions & 0 deletions glusterd2/commands/volumes/volume-expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ func volumeExpandHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "vol-expand.ValidateBricks",
Nodes: nodes,
Skip: lvmResizeOp,
// Need to wait for newly selected bricks to be set by the previous step
Sync: true,
},
{
DoFunc: "vol-expand.InitBricks",
Expand All @@ -193,11 +195,13 @@ func volumeExpandHandler(w http.ResponseWriter, r *http.Request) {
UndoFunc: "vol-create.UndoStoreVolume",
Nodes: []uuid.UUID{gdctx.MyUUID},
Skip: !lvmResizeOp,
Sync: true,
},
{
DoFunc: "vol-expand.UpdateVolinfo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Skip: lvmResizeOp,
Sync: true,
},
{
DoFunc: "vol-expand.GenerateBrickVolfiles",
Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/volumes/volume-option.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func volumeOptionsHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "vol-option.UpdateVolinfo",
UndoFunc: "vol-option.UpdateVolinfo.Undo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "vol-option.XlatorActionDoSet",
Expand Down
2 changes: 2 additions & 0 deletions glusterd2/commands/volumes/volume-reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ func volumeResetHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "vol-option.UpdateVolinfo",
UndoFunc: "vol-option.UpdateVolinfo.Undo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "vol-option.GenerateBrickVolfiles",
UndoFunc: "vol-option.GenerateBrickvolfiles.Undo",
Nodes: volinfo.Nodes(),
Sync: true,
},
{
DoFunc: "vol-option.NotifyVolfileChange",
Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/volumes/volume-start.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func volumeStartHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "vol-start.UpdateVolinfo",
UndoFunc: "vol-start.UpdateVolinfo.Undo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "vol-start.XlatorActionDoVolumeStart",
Expand Down
1 change: 1 addition & 0 deletions glusterd2/commands/volumes/volume-stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func volumeStopHandler(w http.ResponseWriter, r *http.Request) {
DoFunc: "vol-stop.UpdateVolinfo",
UndoFunc: "vol-stop.UpdateVolinfo.Undo",
Nodes: []uuid.UUID{gdctx.MyUUID},
Sync: true,
},
{
DoFunc: "vol-stop.XlatorActionDoVolumeStop",
Expand Down
6 changes: 6 additions & 0 deletions glusterd2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/gluster/glusterd2/glusterd2/pmap"
"github.com/gluster/glusterd2/glusterd2/servers"
"github.com/gluster/glusterd2/glusterd2/store"
"github.com/gluster/glusterd2/glusterd2/transactionv2"
"github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler"
gdutils "github.com/gluster/glusterd2/glusterd2/utils"
"github.com/gluster/glusterd2/glusterd2/volgen"
"github.com/gluster/glusterd2/glusterd2/xlator"
Expand Down Expand Up @@ -105,6 +107,8 @@ func main() {
log.WithError(err).Fatal("Failed to initialize store (etcd client)")
}

transaction.StartTxnEngine()
cleanuphandler.StartCleanupLeader()
// Start the events framework after store is up
if err := events.Start(); err != nil {
log.WithError(err).Fatal("Failed to start internal events framework")
Expand Down Expand Up @@ -168,6 +172,8 @@ func main() {
case unix.SIGINT:
log.Info("Received SIGTERM. Stopping GlusterD")
gdctx.IsTerminating = true
transaction.StopTxnEngine()
cleanuphandler.StopCleanupLeader()
super.Stop()
events.Stop()
store.Close()
Expand Down
45 changes: 31 additions & 14 deletions glusterd2/transaction/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,32 @@ type TxnCtx interface {
// Logger returns the Logrus logger associated with the context
Logger() log.FieldLogger

// commit writes all locally cached keys and values into the store using
// Commit writes all locally cached keys and values into the store using
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear here. You're exporting this and the other types and functions in this package, because of the new package, right? Once we refactor out the old package, these can go back to being unexported.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes Kaushal, I have made few things exported because I wanted to use in new package

// a single etcd transaction. This is for internal use by the txn framework
// and hence isn't exported.
commit() error
Commit() error

// SyncCache synchronizes the locally cached keys and values from the store
SyncCache() error
}

// Tctx represents structure for transaction context
type Tctx struct {
config *txnCtxConfig // this will be marshalled and sent on wire
config *TxnCtxConfig // this will be marshalled and sent on wire
logger log.FieldLogger
readSet map[string][]byte // cached responses from store
readCacheDirty bool
writeSet map[string]string // to be written to store
}

// txnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx
// TxnCtxConfig is marshalled and sent on wire and is used to reconstruct Tctx
// on receiver's end.
type txnCtxConfig struct {
type TxnCtxConfig struct {
LogFields log.Fields
StorePrefix string
}

func newCtx(config *txnCtxConfig) *Tctx {
func newCtx(config *TxnCtxConfig) *Tctx {
return &Tctx{
config: config,
logger: log.StandardLogger().WithFields(config.LogFields),
Expand All @@ -65,6 +68,11 @@ func newCtx(config *txnCtxConfig) *Tctx {
}
}

// NewCtx returns a transaction context from given config
func NewCtx(config *TxnCtxConfig) *Tctx {
return newCtx(config)
}

// Set attaches the given key-value pair to the context.
// If the key exists, the value will be updated.
func (c *Tctx) Set(key string, value interface{}) error {
Expand All @@ -86,9 +94,22 @@ func (c *Tctx) Set(key string, value interface{}) error {
return nil
}

// commit writes all locally cached keys and values into the store using
// SyncCache synchronizes the locally cached keys and values from the store
func (c *Tctx) SyncCache() error {
resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix())
if err != nil {
return err
}

for _, kv := range resp.Kvs {
c.readSet[string(kv.Key)] = kv.Value
}
return nil
}

// Commit writes all locally cached keys and values into the store using
// a single etcd transaction.
func (c *Tctx) commit() error {
func (c *Tctx) Commit() error {

if len(c.writeSet) == 0 {
return nil
Expand Down Expand Up @@ -120,6 +141,7 @@ func (c *Tctx) commit() error {

expTxn.Add("txn_ctx_store_commit", 1)

c.writeSet = make(map[string]string)
c.readCacheDirty = true

return nil
Expand All @@ -139,15 +161,10 @@ func (c *Tctx) Get(key string, value interface{}) error {

// cache all keys and values from the store on the first call to Get
if c.readCacheDirty {
resp, err := store.Get(context.TODO(), c.config.StorePrefix, clientv3.WithPrefix())
if err != nil {
if err := c.SyncCache(); err != nil {
c.logger.WithError(err).WithField("key", key).Error("failed to get key from transaction context")
return err
}
expTxn.Add("txn_ctx_store_get", 1)
for _, kv := range resp.Kvs {
c.readSet[string(kv.Key)] = kv.Value
}
c.readCacheDirty = false
}

Expand Down
Loading