Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue 887 #987

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 98 additions & 2 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
}

// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once) error {
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable, unlockGhostTableDone <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once, okToUnlockGhostTable chan<- bool) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
Expand Down Expand Up @@ -876,6 +876,17 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
}
})

okToUnlockGhostTable <- true
// release original table lock at last,
// should send unlockGhostTableDone channel in exception scenario to make sure unlock original table lock success.

select {
case <-unlockGhostTableDone:
this.migrationContext.Log.Infof("Receive ghost table unlocked channel, unlock tables now")
case <-time.After(time.Duration(time.Second)):
this.migrationContext.Log.Errorf("Wait unlock ghost table timeout, force unlock tables now")
}

// Tables still locked
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
Expand All @@ -894,7 +905,21 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
}

// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
func (this *Applier) AtomicCutoverRename(sessionIdChan, lockGhostSessionIdChan chan int64, tablesRenamed, ghostTableLocked, ghostTableUnlocked chan error, okToUnlockGhostTable, unlockGhostTableDone chan bool) error {
// lock gho table before rename, after lock open a goroutine wait okToUnlockGhoTable channel
// lock original&magic table (session1) -> lock ghost table (session2) -> cut-over table (session3 #blocked) ->
// drop magic table (session1) -> unlock ghost table (session2) -> unlock original table (session1)
go func() {
if err := this.AtomicCutOverGhostLock(ghostTableLocked, ghostTableUnlocked, lockGhostSessionIdChan, okToUnlockGhostTable, unlockGhostTableDone); err != nil {
this.migrationContext.Log.Errore(err)
}
}()

if err := <-ghostTableLocked; err != nil {
sessionIdChan <- -1
return this.migrationContext.Log.Errore(err)
}

tx, err := this.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -936,6 +961,77 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
return nil
}

// AtomicCutOverGhostLock
func (this *Applier) AtomicCutOverGhostLock(ghostTableLocked, ghostTableUnlocked chan<- error, lockGhostSessionIdChan chan int64, okToUnlockGhostTable <-chan bool, unlockGhostTableDone chan<- bool) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
unlockGhostTableDone <- true
lockGhostSessionIdChan <- -1
ghostTableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverGhostLock(), injected to release blocking channel reads")
ghostTableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverGhostLock(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
lockGhostSessionIdChan <- sessionId

lockResult := 0
query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId)
this.migrationContext.Log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName)
ghostTableLocked <- err
return err
}

this.migrationContext.Log.Infof("Setting lock ghost table timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}

query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)

this.migrationContext.Log.Infof("Issuing and expecting get the %s.%s table lock: %s", this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), query)
if _, err := tx.Exec(query); err != nil {
ghostTableLocked <- err
return this.migrationContext.Log.Errore(err)
}

this.migrationContext.Log.Infof("Ghost table locked")
ghostTableLocked <- nil // No error.

<-okToUnlockGhostTable
// release gho table lock after drop magic cut-over table
this.migrationContext.Log.Infof("Will now proceed to unlock ghost table")

this.migrationContext.Log.Infof("Releasing lock from %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
query = `unlock tables`
if _, err := tx.Exec(query); err != nil {
ghostTableUnlocked <- err
}
unlockGhostTableDone <- true
this.migrationContext.Log.Infof("Ghost table unlocked")
ghostTableUnlocked <- nil
return nil
}

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
Expand Down
20 changes: 18 additions & 2 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,12 @@ func (this *Migrator) atomicCutOver() (err error) {
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)

okToUnlockTable := make(chan bool, 4)
okToUnlockGhostTable := make(chan bool, 4)
unlockGhostTableDone := make(chan bool, 4)
var dropCutOverSentryTableOnce sync.Once
defer func() {
okToUnlockTable <- true
unlockGhostTableDone <- true
dropCutOverSentryTableOnce.Do(func() {
this.applier.DropAtomicCutOverSentryTableIfExists()
})
Expand All @@ -648,7 +651,7 @@ func (this *Migrator) atomicCutOver() (err error) {
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &dropCutOverSentryTableOnce); err != nil {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, unlockGhostTableDone, tableUnlocked, &dropCutOverSentryTableOnce, okToUnlockGhostTable); err != nil {
this.migrationContext.Log.Errore(err)
}
}()
Expand All @@ -670,13 +673,23 @@ func (this *Migrator) atomicCutOver() (err error) {
var tableRenameKnownToHaveFailed int64
renameSessionIdChan := make(chan int64, 2)
tablesRenamed := make(chan error, 2)

ghostTableLocked := make(chan error, 2)
ghostTableUnlocked := make(chan error, 2)
lockGhostSessionIdChan := make(chan int64, 2)

go func() {
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, lockGhostSessionIdChan, tablesRenamed, ghostTableLocked, ghostTableUnlocked, okToUnlockGhostTable, unlockGhostTableDone); err != nil {
// Abort! Release the lock
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
okToUnlockTable <- true
okToUnlockGhostTable <- true
unlockGhostTableDone <- true
}
}()
lockGhoSessionId := <-lockGhostSessionIdChan
this.migrationContext.Log.Infof("Session locking ghost table is %+v", lockGhoSessionId)

renameSessionId := <-renameSessionIdChan
this.migrationContext.Log.Infof("Session renaming tables is %+v", renameSessionId)

Expand Down Expand Up @@ -709,6 +722,9 @@ func (this *Migrator) atomicCutOver() (err error) {
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
if err := <-ghostTableUnlocked; err != nil {
return this.migrationContext.Log.Errore(err)
}
if err := <-tableUnlocked; err != nil {
return this.migrationContext.Log.Errore(err)
}
Expand Down