Skip to content

Commit

Permalink
Have the DB flavor process waiting for a pos
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 10, 2023
1 parent a77344b commit 6aff9d0
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 73 deletions.
32 changes: 14 additions & 18 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,9 @@ type flavor interface {
// with parsed executed position.
primaryStatus(c *Conn) (replication.PrimaryStatus, error)

// waitUntilPositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error)
// waitUntilPosition waits until the given position is reached or
// until the context expires and returns an error if we did not succeed.
waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error

baseShowTables() string
baseShowTablesWithSizes() string
Expand All @@ -166,7 +164,8 @@ type CapableOf func(capability FlavorCapability) (bool, error)
var flavors = make(map[string]func() flavor)

// ServerVersionAtLeast returns true if current server is at least given value.
// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're on MySQL 8.0.23, 8.0.24, ...
// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're
// on MySQL 8.0.23, 8.0.24, ...
func ServerVersionAtLeast(serverVersion string, parts ...int) (bool, error) {
versionPrefix := strings.Split(serverVersion, "-")[0]
versionTokens := strings.Split(versionPrefix, ".")
Expand Down Expand Up @@ -448,21 +447,18 @@ func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) {
return c.flavor.primaryStatus(c)
}

// WaitUntilPositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
func (c *Conn) WaitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
return c.flavor.waitUntilPositionCommand(ctx, pos)
// WaitUntilPosition waits until the given position is reached or until the
// context expires. It returns an error if we did not succeed.
func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position) error {
return c.flavor.waitUntilPosition(ctx, c, pos)
}

// WaitUntilFilePositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires for the file position flavor. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
func (c *Conn) WaitUntilFilePositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// WaitUntilFilePosition waits until the given position is reached or until
// the context expires for the file position flavor. It returns an error if
// we did not succeed.
func (c *Conn) WaitUntilFilePosition(ctx context.Context, pos replication.Position) error {
filePosFlavor := filePosFlavor{}
return filePosFlavor.waitUntilPositionCommand(ctx, pos)
return filePosFlavor.waitUntilPosition(ctx, c, pos)
}

// BaseShowTables returns a query that shows tables
Expand Down
45 changes: 39 additions & 6 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mysql

import (
"context"
"errors"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -266,22 +267,54 @@ func (flv *filePosFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, err
return replication.ParseFilePosPrimaryStatus(resultMap)
}

// waitUntilPositionCommand is part of the Flavor interface.
func (flv *filePosFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// waitUntilPosition is part of the Flavor interface.
func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
filePosPos, ok := pos.GTIDSet.(replication.FilePosGTID)
if !ok {
return "", fmt.Errorf("Position is not filePos compatible: %#v", pos.GTIDSet)
return fmt.Errorf("position is not filePos compatible: %#v", pos.GTIDSet)
}

query := fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos)
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", fmt.Errorf("timed out waiting for position %v", pos)
return fmt.Errorf("timed out waiting for position %v", pos)
}
return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()), nil
query = fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds())
}

return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos), nil
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For MASTER_POS_WAIT(), the return value is the number of log events
// the replica had to wait for to advance to the specified position.
// The function returns NULL if the replica SQL thread is not started,
// the replica's source information is not initialized, the arguments
// are incorrect, or an error occurs. It returns -1 if the timeout has
// been exceeded. If the replica SQL thread stops while MASTER_POS_WAIT()
// is waiting, the function returns NULL. If the replica is past the
// specified position, the function returns immediately.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return errors.New("invalid results")
}
val := result.Rows[0][0]
if val.IsNull() {
return errors.New("replication is not running")
}
state, err := val.ToInt64()
if err != nil {
return fmt.Errorf("invalid result of %v", val)
}
switch {
case state == -1:
return fmt.Errorf("timed out waiting for position %v", pos)
case state >= 0:
return nil
default:
return fmt.Errorf("invalid result of %v", state)
}
}

func (*filePosFlavor) startReplicationUntilAfter(pos replication.Position) string {
Expand Down
38 changes: 31 additions & 7 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mysql

import (
"context"
"errors"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -223,22 +224,45 @@ func (m mariadbFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error)
return status, err
}

// waitUntilPositionCommand is part of the Flavor interface.
// waitUntilPosition is part of the Flavor interface.
//
// Note: Unlike MASTER_POS_WAIT(), MASTER_GTID_WAIT() will continue waiting even
// if the sql thread stops. If that is a problem, we'll have to change this.
func (mariadbFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
func (mariadbFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
// Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means
// return immediately.
query := fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos)
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}
return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()), nil
query = fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds())
}

// Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means
// return immediately.
return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos), nil
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For MASTER_GTID_WAIT(), if the wait completes without a timeout 0 is
// returned and -1 if there was a timeout.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return errors.New("invalid results")
}
val := result.Rows[0][0]
state, err := val.ToInt64()
if err != nil {
return fmt.Errorf("invalid result of %v", val)
}
switch state {
case 0:
return nil
case -1:
return fmt.Errorf("timed out waiting for position %v", pos)
default:
return fmt.Errorf("invalid result of %v", state)
}
}

// readBinlogEvent is part of the Flavor interface.
Expand Down
34 changes: 30 additions & 4 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mysql

import (
"context"
"errors"
"fmt"
"io"
"time"
Expand All @@ -26,6 +27,8 @@ import (
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// mysqlFlavor implements the Flavor interface for Mysql.
Expand Down Expand Up @@ -216,14 +219,14 @@ func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) {
return replication.ParseMysqlPrimaryStatus(resultMap)
}

// waitUntilPositionCommand is part of the Flavor interface.
func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// waitUntilPosition is part of the Flavor interface.
func (mysqlFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
// A timeout of 0 means wait indefinitely.
timeoutSeconds := 0
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}

// Only whole numbers of seconds are supported.
Expand All @@ -234,7 +237,30 @@ func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication
}
}

return fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds), nil
query := fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds)
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For WAIT_FOR_EXECUTED_GTID_SET(), the return value is the state of the query, where
// 0 represents success, and 1 represents timeout. Any other failures generate an error.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return errors.New("invalid results")
}
val := result.Rows[0][0]
state, err := val.ToInt64()
if err != nil {
return fmt.Errorf("invalid result of %v", val)
}
switch state {
case 0:
return nil
case 1:
return fmt.Errorf("timed out waiting for position %v", pos)
default:
return fmt.Errorf("invalid result of %v", state)
}
}

// readBinlogEvent is part of the Flavor interface.
Expand Down
49 changes: 11 additions & 38 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ func (mysqld *Mysqld) SetSuperReadOnly(on bool) (ResetSuperReadOnlyFunc, error)
return resetFunc, nil
}

// WaitSourcePos lets replicas wait to given replication position
// WaitSourcePos lets replicas wait for the given replication position to
// be reached.
func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.Position) error {
// Get a connection.
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
Expand All @@ -324,61 +325,33 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P
}
defer conn.Recycle()

// First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection,
// unless that flavor is also filePos.
waitCommandName := "WaitUntilPositionCommand"
var query string
// First check if filePos flavored Position was passed in. If so, we
// can't defer to the flavor in the connection, unless that flavor is
// also filePos.
if targetPos.MatchesFlavor(replication.FilePosFlavorID) {
// If we are the primary, WaitUntilFilePositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
// If we are the primary, WaitUntilFilePosition will fail. But
// position is most likely reached. So, check the position first.
mpos, err := conn.Conn.PrimaryFilePosition()
if err != nil {
return fmt.Errorf("WaitSourcePos: PrimaryFilePosition failed: %v", err)
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err = conn.Conn.WaitUntilFilePositionCommand(ctx, targetPos)
if err != nil {
return err
}
waitCommandName = "WaitUntilFilePositionCommand"
} else {
// If we are the primary, WaitUntilPositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
// If we are the primary, WaitUntilPosition will fail. But
// position is most likely reached. So, check the position first.
mpos, err := conn.Conn.PrimaryPosition()
if err != nil {
return fmt.Errorf("WaitSourcePos: PrimaryPosition failed: %v", err)
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err = conn.Conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
return err
}
}

qr, err := mysqld.FetchSuperQuery(ctx, query)
if err != nil {
return fmt.Errorf("%v(%v) failed: %v", waitCommandName, query, err)
}

if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return fmt.Errorf("unexpected result format from %v(%v): %#v", waitCommandName, query, qr)
}
result := qr.Rows[0][0]
if result.IsNull() {
return fmt.Errorf("%v(%v) failed: replication is probably stopped", waitCommandName, query)
}
if result.ToString() == "-1" {
return fmt.Errorf("timed out waiting for position %v", targetPos)
if err := conn.Conn.WaitUntilPosition(ctx, targetPos); err != nil {
return fmt.Errorf("WaitSourcePos failed: %v", err)
}
return nil
}
Expand Down

0 comments on commit 6aff9d0

Please sign in to comment.