Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication: Have the DB flavor process waiting for a pos #14745

Merged
merged 2 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,10 @@ type flavor interface {
// with parsed executed position.
primaryStatus(c *Conn) (replication.PrimaryStatus, error)

// waitUntilPositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error)
// waitUntilPosition waits until the given position is reached or
// until the context expires. It returns an error if we did not
// succeed.
waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error

baseShowTables() string
baseShowTablesWithSizes() string
Expand All @@ -166,7 +165,8 @@ type CapableOf func(capability FlavorCapability) (bool, error)
var flavors = make(map[string]func() flavor)

// ServerVersionAtLeast returns true if current server is at least given value.
// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're on MySQL 8.0.23, 8.0.24, ...
// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're
// on MySQL 8.0.23, 8.0.24, ...
func ServerVersionAtLeast(serverVersion string, parts ...int) (bool, error) {
versionPrefix := strings.Split(serverVersion, "-")[0]
versionTokens := strings.Split(versionPrefix, ".")
Expand Down Expand Up @@ -448,21 +448,18 @@ func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) {
return c.flavor.primaryStatus(c)
}

// WaitUntilPositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
func (c *Conn) WaitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
return c.flavor.waitUntilPositionCommand(ctx, pos)
// WaitUntilPosition waits until the given position is reached or until the
// context expires. It returns an error if we did not succeed.
func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position) error {
return c.flavor.waitUntilPosition(ctx, c, pos)
}

// WaitUntilFilePositionCommand returns the SQL command to issue
// to wait until the given position, until the context
// expires for the file position flavor. The command returns -1 if it times out. It
// returns NULL if GTIDs are not enabled.
func (c *Conn) WaitUntilFilePositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// WaitUntilFilePosition waits until the given position is reached or until
// the context expires for the file position flavor. It returns an error if
// we did not succeed.
func (c *Conn) WaitUntilFilePosition(ctx context.Context, pos replication.Position) error {
filePosFlavor := filePosFlavor{}
return filePosFlavor.waitUntilPositionCommand(ctx, pos)
return filePosFlavor.waitUntilPosition(ctx, c, pos)
}

// BaseShowTables returns a query that shows tables
Expand Down
44 changes: 38 additions & 6 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,54 @@ func (flv *filePosFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, err
return replication.ParseFilePosPrimaryStatus(resultMap)
}

// waitUntilPositionCommand is part of the Flavor interface.
func (flv *filePosFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// waitUntilPosition is part of the Flavor interface.
func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
filePosPos, ok := pos.GTIDSet.(replication.FilePosGTID)
if !ok {
return "", fmt.Errorf("Position is not filePos compatible: %#v", pos.GTIDSet)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "position is not filePos compatible: %#v", pos.GTIDSet)
}

query := fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos)
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", fmt.Errorf("timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}
return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()), nil
query = fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds())
}

return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos), nil
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For MASTER_POS_WAIT(), the return value is the number of log events
// the replica had to wait for to advance to the specified position.
// The function returns NULL if the replica SQL thread is not started,
// the replica's source information is not initialized, the arguments
// are incorrect, or an error occurs. It returns -1 if the timeout has
// been exceeded. If the replica SQL thread stops while MASTER_POS_WAIT()
// is waiting, the function returns NULL. If the replica is past the
// specified position, the function returns immediately.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result)
}
val := result.Rows[0][0]
if val.IsNull() {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "replication is not running")
}
state, err := val.ToInt64()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val)
}
switch {
case state == -1:
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
case state >= 0:
return nil
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state)
}
}

func (*filePosFlavor) startReplicationUntilAfter(pos replication.Position) string {
Expand Down
42 changes: 33 additions & 9 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// mariadbFlavor implements the Flavor interface for MariaDB.
Expand All @@ -48,7 +49,7 @@ func (mariadbFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) {
return nil, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr)
}

return replication.ParseMariadbGTIDSet(qr.Rows[0][0].ToString())
Expand Down Expand Up @@ -223,22 +224,45 @@ func (m mariadbFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error)
return status, err
}

// waitUntilPositionCommand is part of the Flavor interface.
// waitUntilPosition is part of the Flavor interface.
//
// Note: Unlike MASTER_POS_WAIT(), MASTER_GTID_WAIT() will continue waiting even
// if the sql thread stops. If that is a problem, we'll have to change this.
func (mariadbFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
func (mariadbFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
// Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means
// return immediately.
query := fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos)
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}
return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()), nil
query = fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds())
}

// Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means
// return immediately.
return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos), nil
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For MASTER_GTID_WAIT(), if the wait completes without a timeout 0 is
// returned and -1 if there was a timeout.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result)
}
val := result.Rows[0][0]
state, err := val.ToInt64()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val)
}
switch state {
case 0:
return nil
case -1:
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state)
}
}

// readBinlogEvent is part of the Flavor interface.
Expand Down
44 changes: 34 additions & 10 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// mysqlFlavor implements the Flavor interface for Mysql.
Expand All @@ -52,7 +53,7 @@ func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) {
return nil, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr)
}
return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString())
}
Expand All @@ -65,7 +66,7 @@ func (mysqlFlavor) purgedGTIDSet(c *Conn) (replication.GTIDSet, error) {
return nil, err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr)
}
return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString())
}
Expand All @@ -78,7 +79,7 @@ func (mysqlFlavor) serverUUID(c *Conn) (string, error) {
return "", err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr)
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr)
}
return qr.Rows[0][0].ToString(), nil
}
Expand All @@ -90,7 +91,7 @@ func (mysqlFlavor) gtidMode(c *Conn) (string, error) {
return "", err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr)
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr)
}
return qr.Rows[0][0].ToString(), nil
}
Expand Down Expand Up @@ -135,7 +136,7 @@ func (mysqlFlavor) startSQLThreadCommand() string {
func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error {
gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet)
if !ok {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet)
}

// Build the command.
Expand Down Expand Up @@ -216,14 +217,14 @@ func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) {
return replication.ParseMysqlPrimaryStatus(resultMap)
}

// waitUntilPositionCommand is part of the Flavor interface.
func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) {
// waitUntilPosition is part of the Flavor interface.
func (mysqlFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error {
// A timeout of 0 means wait indefinitely.
timeoutSeconds := 0
if deadline, ok := ctx.Deadline(); ok {
timeout := time.Until(deadline)
if timeout <= 0 {
return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
}

// Only whole numbers of seconds are supported.
Expand All @@ -234,7 +235,30 @@ func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication
}
}

return fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds), nil
query := fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds)
result, err := c.ExecuteFetch(query, 1, false)
if err != nil {
return err
}

// For WAIT_FOR_EXECUTED_GTID_SET(), the return value is the state of the query, where
// 0 represents success, and 1 represents timeout. Any other failures generate an error.
if len(result.Rows) != 1 || len(result.Rows[0]) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result)
}
val := result.Rows[0][0]
state, err := val.ToInt64()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val)
}
switch state {
case 0:
return nil
case 1:
return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos)
default:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state)
}
}

// readBinlogEvent is part of the Flavor interface.
Expand Down
54 changes: 14 additions & 40 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"
)

type ResetSuperReadOnlyFunc func() error
Expand Down Expand Up @@ -315,7 +316,8 @@ func (mysqld *Mysqld) SetSuperReadOnly(on bool) (ResetSuperReadOnlyFunc, error)
return resetFunc, nil
}

// WaitSourcePos lets replicas wait to given replication position
// WaitSourcePos lets replicas wait for the given replication position to
// be reached.
func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.Position) error {
// Get a connection.
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
Expand All @@ -324,61 +326,33 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P
}
defer conn.Recycle()

// First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection,
// unless that flavor is also filePos.
waitCommandName := "WaitUntilPositionCommand"
var query string
// First check if filePos flavored Position was passed in. If so, we
// can't defer to the flavor in the connection, unless that flavor is
// also filePos.
if targetPos.MatchesFlavor(replication.FilePosFlavorID) {
// If we are the primary, WaitUntilFilePositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
// If we are the primary, WaitUntilFilePosition will fail. But
// position is most likely reached. So, check the position first.
mpos, err := conn.Conn.PrimaryFilePosition()
if err != nil {
return fmt.Errorf("WaitSourcePos: PrimaryFilePosition failed: %v", err)
return vterrors.Wrapf(err, "WaitSourcePos: PrimaryFilePosition failed")
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err = conn.Conn.WaitUntilFilePositionCommand(ctx, targetPos)
if err != nil {
return err
}
waitCommandName = "WaitUntilFilePositionCommand"
} else {
// If we are the primary, WaitUntilPositionCommand will fail.
// But position is most likely reached. So, check the position
// first.
// If we are the primary, WaitUntilPosition will fail. But
// position is most likely reached. So, check the position first.
mpos, err := conn.Conn.PrimaryPosition()
if err != nil {
return fmt.Errorf("WaitSourcePos: PrimaryPosition failed: %v", err)
return vterrors.Wrapf(err, "WaitSourcePos: PrimaryPosition failed")
}
if mpos.AtLeast(targetPos) {
return nil
}

// Find the query to run, run it.
query, err = conn.Conn.WaitUntilPositionCommand(ctx, targetPos)
if err != nil {
return err
}
}

qr, err := mysqld.FetchSuperQuery(ctx, query)
if err != nil {
return fmt.Errorf("%v(%v) failed: %v", waitCommandName, query, err)
}

if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 {
return fmt.Errorf("unexpected result format from %v(%v): %#v", waitCommandName, query, qr)
}
result := qr.Rows[0][0]
if result.IsNull() {
return fmt.Errorf("%v(%v) failed: replication is probably stopped", waitCommandName, query)
}
if result.ToString() == "-1" {
return fmt.Errorf("timed out waiting for position %v", targetPos)
if err := conn.Conn.WaitUntilPosition(ctx, targetPos); err != nil {
return vterrors.Wrapf(err, "WaitSourcePos failed")
}
return nil
}
Expand Down
Loading