Skip to content

Commit

Permalink
Replication: Have the DB flavor process waiting for a pos (vitessio#1…
Browse files Browse the repository at this point in the history
…4745)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored and ejortegau committed Dec 13, 2023
1 parent 30ee133 commit 76081ef
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 83 deletions.
33 changes: 15 additions & 18 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,10 @@ type flavor interface {
// with parsed executed position.
primaryStatus(c *Conn) (replication.PrimaryStatus, error)

// waitUntilPositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error)
// waitUntilPosition waits until the given position is reached or
// until the context expires. It returns an error if we did not
// succeed.
waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error

baseShowTables() string
baseShowTablesWithSizes() string
Expand All @@ -166,7 +165,8 @@ type CapableOf func(capability FlavorCapability) (bool, error)
var flavors = make(map[string]func() flavor)

// ServerVersionAtLeast returns true if current server is at least given value.
// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're on MySQL 8.0.23, 8.0.24, ...
// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're
// on MySQL 8.0.23, 8.0.24, ...
func ServerVersionAtLeast(serverVersion string, parts ...int) (bool, error) {
versionPrefix := strings.Split(serverVersion, "-")[0]
versionTokens := strings.Split(versionPrefix, ".")
Expand Down Expand Up @@ -448,21 +448,18 @@ func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) {
return c.flavor.primaryStatus(c)
}

// WaitUntilPositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
func (c *Conn) WaitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
return c.flavor.waitUntilPositionCommand(ctx, pos)
// WaitUntilPosition waits until the given position is reached or until the
// context expires. It returns an error if we did not succeed.
func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position) error {
return c.flavor.waitUntilPosition(ctx, c, pos)
}

// WaitUntilFilePositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires for the file position flavor. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
func (c *Conn) WaitUntilFilePositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// WaitUntilFilePosition waits until the given position is reached or until
// the context expires for the file position flavor. It returns an error if
// we did not succeed.
func (c *Conn) WaitUntilFilePosition(ctx context.Context, pos replication.Position) error {
filePosFlavor := filePosFlavor{}
return filePosFlavor.waitUntilPositionCommand(ctx, pos)
return filePosFlavor.waitUntilPosition(ctx, c, pos)
}

// BaseShowTables returns a query that shows tables
Expand Down
44 changes: 38 additions & 6 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,54 @@ func (flv *filePosFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, err
return replication.ParseFilePosPrimaryStatus(resultMap)
}

// waitUntilPositionCommand is part of the Flavor interface.
func (flv *filePosFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// waitUntilPosition is part of the Flavor interface.
func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
filePosPos, ok := pos.GTIDSet.(replication.FilePosGTID)
if !ok {
return "", fmt.Errorf("Position is not filePos compatible: %#v", pos.GTIDSet)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "position is not filePos compatible: %#v", pos.GTIDSet)
}

query := fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos)
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", fmt.Errorf("timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}
return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()), nil
query = fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds())
}

return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos), nil
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For MASTER_POS_WAIT(), the return value is the number of log events
// the replica had to wait for to advance to the specified position.
// The function returns NULL if the replica SQL thread is not started,
// the replica's source information is not initialized, the arguments
// are incorrect, or an error occurs. It returns -1 if the timeout has
// been exceeded. If the replica SQL thread stops while MASTER_POS_WAIT()
// is waiting, the function returns NULL. If the replica is past the
// specified position, the function returns immediately.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result)
}
val := result.Rows[0][0]
if val.IsNull() {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "replication is not running")
}
state, err := val.ToInt64()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val)
}
switch {
case state == -1:
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
case state >= 0:
return nil
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state)
}
}

func (*filePosFlavor) startReplicationUntilAfter(pos replication.Position) string {
Expand Down
42 changes: 33 additions & 9 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// mariadbFlavor implements the Flavor interface for MariaDB.
Expand All @@ -48,7 +49,7 @@ func (mariadbFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) {
return nil, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr)
}

return replication.ParseMariadbGTIDSet(qr.Rows[0][0].ToString())
Expand Down Expand Up @@ -223,22 +224,45 @@ func (m mariadbFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error)
return status, err
}

// waitUntilPositionCommand is part of the Flavor interface.
// waitUntilPosition is part of the Flavor interface.
//
// Note: Unlike MASTER_POS_WAIT(), MASTER_GTID_WAIT() will continue waiting even
// if the sql thread stops. If that is a problem, we'll have to change this.
func (mariadbFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
func (mariadbFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
// Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means
// return immediately.
query := fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos)
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}
return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()), nil
query = fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds())
}

// Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means
// return immediately.
return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos), nil
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For MASTER_GTID_WAIT(), if the wait completes without a timeout 0 is
// returned and -1 if there was a timeout.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result)
}
val := result.Rows[0][0]
state, err := val.ToInt64()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val)
}
switch state {
case 0:
return nil
case -1:
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state)
}
}

// readBinlogEvent is part of the Flavor interface.
Expand Down
44 changes: 34 additions & 10 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// mysqlFlavor implements the Flavor interface for Mysql.
Expand All @@ -52,7 +53,7 @@ func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) {
return nil, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr)
}
return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString())
}
Expand All @@ -65,7 +66,7 @@ func (mysqlFlavor) purgedGTIDSet(c *Conn) (replication.GTIDSet, error) {
return nil, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr)
}
return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString())
}
Expand All @@ -78,7 +79,7 @@ func (mysqlFlavor) serverUUID(c *Conn) (string, error) {
return "", err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr)
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr)
}
return qr.Rows[0][0].ToString(), nil
}
Expand All @@ -90,7 +91,7 @@ func (mysqlFlavor) gtidMode(c *Conn) (string, error) {
return "", err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr)
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr)
}
return qr.Rows[0][0].ToString(), nil
}
Expand Down Expand Up @@ -135,7 +136,7 @@ func (mysqlFlavor) startSQLThreadCommand() string {
func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error {
gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet)
if !ok {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet)
}

// Build the command.
Expand Down Expand Up @@ -216,14 +217,14 @@ func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) {
return replication.ParseMysqlPrimaryStatus(resultMap)
}

// waitUntilPositionCommand is part of the Flavor interface.
func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// waitUntilPosition is part of the Flavor interface.
func (mysqlFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
// A timeout of 0 means wait indefinitely.
timeoutSeconds := 0
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}

// Only whole numbers of seconds are supported.
Expand All @@ -234,7 +235,30 @@ func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication
}
}

return fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds), nil
query := fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds)
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For WAIT_FOR_EXECUTED_GTID_SET(), the return value is the state of the query, where
// 0 represents success, and 1 represents timeout. Any other failures generate an error.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result)
}
val := result.Rows[0][0]
state, err := val.ToInt64()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val)
}
switch state {
case 0:
return nil
case 1:
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state)
}
}

// readBinlogEvent is part of the Flavor interface.
Expand Down
54 changes: 14 additions & 40 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
)

type ResetSuperReadOnlyFunc func() error
Expand Down Expand Up @@ -315,7 +316,8 @@ func (mysqld *Mysqld) SetSuperReadOnly(on bool) (ResetSuperReadOnlyFunc, error)
return resetFunc, nil
}

// WaitSourcePos lets replicas wait to given replication position
// WaitSourcePos lets replicas wait for the given replication position to
// be reached.
func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.Position) error {
// Get a connection.
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
Expand All @@ -324,61 +326,33 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P
}
defer conn.Recycle()

// First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection,
// unless that flavor is also filePos.
waitCommandName := "WaitUntilPositionCommand"
var query string
// First check if filePos flavored Position was passed in. If so, we
// can't defer to the flavor in the connection, unless that flavor is
// also filePos.
if targetPos.MatchesFlavor(replication.FilePosFlavorID) {
// If we are the primary, WaitUntilFilePositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
// If we are the primary, WaitUntilFilePosition will fail. But
// position is most likely reached. So, check the position first.
mpos, err := conn.Conn.PrimaryFilePosition()
if err != nil {
return fmt.Errorf("WaitSourcePos: PrimaryFilePosition failed: %v", err)
return vterrors.Wrapf(err, "WaitSourcePos: PrimaryFilePosition failed")
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err = conn.Conn.WaitUntilFilePositionCommand(ctx, targetPos)
if err != nil {
return err
}
waitCommandName = "WaitUntilFilePositionCommand"
} else {
// If we are the primary, WaitUntilPositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
// If we are the primary, WaitUntilPosition will fail. But
// position is most likely reached. So, check the position first.
mpos, err := conn.Conn.PrimaryPosition()
if err != nil {
return fmt.Errorf("WaitSourcePos: PrimaryPosition failed: %v", err)
return vterrors.Wrapf(err, "WaitSourcePos: PrimaryPosition failed")
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err = conn.Conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
return err
}
}

qr, err := mysqld.FetchSuperQuery(ctx, query)
if err != nil {
return fmt.Errorf("%v(%v) failed: %v", waitCommandName, query, err)
}

if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return fmt.Errorf("unexpected result format from %v(%v): %#v", waitCommandName, query, qr)
}
result := qr.Rows[0][0]
if result.IsNull() {
return fmt.Errorf("%v(%v) failed: replication is probably stopped", waitCommandName, query)
}
if result.ToString() == "-1" {
return fmt.Errorf("timed out waiting for position %v", targetPos)
if err := conn.Conn.WaitUntilPosition(ctx, targetPos); err != nil {
return vterrors.Wrapf(err, "WaitSourcePos failed")
}
return nil
}
Expand Down

0 comments on commit 76081ef

Please sign in to comment.