Skip to content

Commit

Permalink
rename serverLock to tableLock to not confuse semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Jul 28, 2024
1 parent 040a4f3 commit f201252
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
6 changes: 3 additions & 3 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,15 +332,15 @@ func (c *Checker) initConnPool(ctx context.Context) error {
// Lock the source table in a trx
// so the connection is not used by others
c.logger.Info("starting checksum operation, this will require a table lock")
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
if err != nil {
return err
}
defer serverLock.Close()
defer tableLock.Close()
// With the lock held, flush one more time under the lock tables.
// Because we know canal is up to date this now guarantees
// we have everything in the new table.
if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil {
if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil {
return err
}
// Assert that the change set is empty. This should always
Expand Down
11 changes: 5 additions & 6 deletions pkg/migration/cutover.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,21 @@ func (c *CutOver) Run(ctx context.Context) error {
func (c *CutOver) algorithmRenameUnderLock(ctx context.Context) error {
// Lock the source table in a trx
// so the connection is not used by others
serverLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
tableLock, err := dbconn.NewTableLock(ctx, c.db, c.table, c.dbConfig, c.logger)
if err != nil {
return err
}
defer serverLock.Close()
if err := c.feed.FlushUnderLock(ctx, serverLock); err != nil {
defer tableLock.Close()
if err := c.feed.FlushUnderTableLock(ctx, tableLock); err != nil {
return err
}
if !c.feed.AllChangesFlushed() {
return errors.New("not all changes flushed, final flush might be broken")
}
oldName := c.oldTableName
oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, oldName)
oldQuotedName := fmt.Sprintf("`%s`.`%s`", c.table.SchemaName, c.oldTableName)
renameStatement := fmt.Sprintf("RENAME TABLE %s TO %s, %s TO %s",
c.table.QuotedName, oldQuotedName,
c.newTable.QuotedName, c.table.QuotedName,
)
return serverLock.ExecUnderLock(ctx, renameStatement)
return tableLock.ExecUnderLock(ctx, renameStatement)
}
4 changes: 2 additions & 2 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,15 @@ func (c *Client) Close() {
}
}

// FlushUnderLock is a final flush under an exclusive lock using the connection
// FlushUnderTableLock is a final flush under an exclusive table lock using the connection
// that holds a write lock. Because flushing generates binary log events,
// we actually want to call flush *twice*:
// - The first time flushes the pending changes to the new table.
// - We then ensure that we have all the binary log changes read from the server.
// - The second time reads through the changes generated by the first flush
// and updates the in memory applied position to match the server's position.
// This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.
func (c *Client) FlushUnderLock(ctx context.Context, lock *dbconn.TableLock) error {
func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error {
if err := c.flush(ctx, true, lock); err != nil {
return err
}
Expand Down

0 comments on commit f201252

Please sign in to comment.