From 6aff9d0a98fd9e0a03dcfd954450de2c7ae776f0 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 10 Dec 2023 13:55:43 -0500 Subject: [PATCH 1/2] Have the DB flavor process waiting for a pos Signed-off-by: Matt Lord --- go/mysql/flavor.go | 32 ++++++++++------------- go/mysql/flavor_filepos.go | 45 +++++++++++++++++++++++++++----- go/mysql/flavor_mariadb.go | 38 ++++++++++++++++++++++----- go/mysql/flavor_mysql.go | 34 +++++++++++++++++++++--- go/vt/mysqlctl/replication.go | 49 ++++++++--------------------------- 5 files changed, 125 insertions(+), 73 deletions(-) diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index a8c2de2e114..09fae3e7c2f 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -146,11 +146,9 @@ type flavor interface { // with parsed executed position. primaryStatus(c *Conn) (replication.PrimaryStatus, error) - // waitUntilPositionCommand returns the SQL command to issue - // to wait until the given position, until the context - // expires. The command returns -1 if it times out. It - // returns NULL if GTIDs are not enabled. - waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) + // waitUntilPosition waits until the given position is reached or + // until the context expires and returns an error if we did not succeed. + waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error baseShowTables() string baseShowTablesWithSizes() string @@ -166,7 +164,8 @@ type CapableOf func(capability FlavorCapability) (bool, error) var flavors = make(map[string]func() flavor) // ServerVersionAtLeast returns true if current server is at least given value. -// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're on MySQL 8.0.23, 8.0.24, ... +// Example: if input is []int{8, 0, 23}... the function returns 'true' if we're +// on MySQL 8.0.23, 8.0.24, ... func ServerVersionAtLeast(serverVersion string, parts ...int) (bool, error) { versionPrefix := strings.Split(serverVersion, "-")[0] versionTokens := strings.Split(versionPrefix, ".") @@ -448,21 +447,18 @@ func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) { return c.flavor.primaryStatus(c) } -// WaitUntilPositionCommand returns the SQL command to issue -// to wait until the given position, until the context -// expires. The command returns -1 if it times out. It -// returns NULL if GTIDs are not enabled. -func (c *Conn) WaitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { - return c.flavor.waitUntilPositionCommand(ctx, pos) +// WaitUntilPosition waits until the given position is reached or until the +// context expires. It returns an error if we did not succeed. +func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position) error { + return c.flavor.waitUntilPosition(ctx, c, pos) } -// WaitUntilFilePositionCommand returns the SQL command to issue -// to wait until the given position, until the context -// expires for the file position flavor. The command returns -1 if it times out. It -// returns NULL if GTIDs are not enabled. -func (c *Conn) WaitUntilFilePositionCommand(ctx context.Context, pos replication.Position) (string, error) { +// WaitUntilFilePosition waits until the given position is reached or until +// the context expires for the file position flavor. It returns an error if +// we did not succeed. +func (c *Conn) WaitUntilFilePosition(ctx context.Context, pos replication.Position) error { filePosFlavor := filePosFlavor{} - return filePosFlavor.waitUntilPositionCommand(ctx, pos) + return filePosFlavor.waitUntilPosition(ctx, c, pos) } // BaseShowTables returns a query that shows tables diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index bf4076b85b1..882e27ea092 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -18,6 +18,7 @@ package mysql import ( "context" + "errors" "fmt" "io" "strings" @@ -266,22 +267,54 @@ func (flv *filePosFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, err return replication.ParseFilePosPrimaryStatus(resultMap) } -// waitUntilPositionCommand is part of the Flavor interface. -func (flv *filePosFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { +// waitUntilPosition is part of the Flavor interface. +func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error { filePosPos, ok := pos.GTIDSet.(replication.FilePosGTID) if !ok { - return "", fmt.Errorf("Position is not filePos compatible: %#v", pos.GTIDSet) + return fmt.Errorf("position is not filePos compatible: %#v", pos.GTIDSet) } + query := fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos) if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return "", fmt.Errorf("timed out waiting for position %v", pos) + return fmt.Errorf("timed out waiting for position %v", pos) } - return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()), nil + query = fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()) } - return fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos), nil + result, err := c.ExecuteFetch(query, 1, false) + if err != nil { + return err + } + + // For MASTER_POS_WAIT(), the return value is the number of log events + // the replica had to wait for to advance to the specified position. + // The function returns NULL if the replica SQL thread is not started, + // the replica's source information is not initialized, the arguments + // are incorrect, or an error occurs. It returns -1 if the timeout has + // been exceeded. If the replica SQL thread stops while MASTER_POS_WAIT() + // is waiting, the function returns NULL. If the replica is past the + // specified position, the function returns immediately. + if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { + return errors.New("invalid results") + } + val := result.Rows[0][0] + if val.IsNull() { + return errors.New("replication is not running") + } + state, err := val.ToInt64() + if err != nil { + return fmt.Errorf("invalid result of %v", val) + } + switch { + case state == -1: + return fmt.Errorf("timed out waiting for position %v", pos) + case state >= 0: + return nil + default: + return fmt.Errorf("invalid result of %v", state) + } } func (*filePosFlavor) startReplicationUntilAfter(pos replication.Position) string { diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 15718542b45..37f1d7c924f 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -19,6 +19,7 @@ package mysql import ( "context" + "errors" "fmt" "io" "time" @@ -223,22 +224,45 @@ func (m mariadbFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) return status, err } -// waitUntilPositionCommand is part of the Flavor interface. +// waitUntilPosition is part of the Flavor interface. // // Note: Unlike MASTER_POS_WAIT(), MASTER_GTID_WAIT() will continue waiting even // if the sql thread stops. If that is a problem, we'll have to change this. -func (mariadbFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { +func (mariadbFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error { + // Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means + // return immediately. + query := fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos) if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) } - return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()), nil + query = fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()) } - // Omit the timeout to wait indefinitely. In MariaDB, a timeout of 0 means - // return immediately. - return fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s')", pos), nil + result, err := c.ExecuteFetch(query, 1, false) + if err != nil { + return err + } + + // For MASTER_GTID_WAIT(), if the wait completes without a timeout 0 is + // returned and -1 if there was a timeout. + if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { + return errors.New("invalid results") + } + val := result.Rows[0][0] + state, err := val.ToInt64() + if err != nil { + return fmt.Errorf("invalid result of %v", val) + } + switch state { + case 0: + return nil + case -1: + return fmt.Errorf("timed out waiting for position %v", pos) + default: + return fmt.Errorf("invalid result of %v", state) + } } // readBinlogEvent is part of the Flavor interface. diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index be11126ff9c..8c5e06a719f 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -18,6 +18,7 @@ package mysql import ( "context" + "errors" "fmt" "io" "time" @@ -26,6 +27,8 @@ import ( "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // mysqlFlavor implements the Flavor interface for Mysql. @@ -216,14 +219,14 @@ func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { return replication.ParseMysqlPrimaryStatus(resultMap) } -// waitUntilPositionCommand is part of the Flavor interface. -func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication.Position) (string, error) { +// waitUntilPosition is part of the Flavor interface. +func (mysqlFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error { // A timeout of 0 means wait indefinitely. timeoutSeconds := 0 if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return "", vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) } // Only whole numbers of seconds are supported. @@ -234,7 +237,30 @@ func (mysqlFlavor) waitUntilPositionCommand(ctx context.Context, pos replication } } - return fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds), nil + query := fmt.Sprintf("SELECT WAIT_FOR_EXECUTED_GTID_SET('%s', %v)", pos, timeoutSeconds) + result, err := c.ExecuteFetch(query, 1, false) + if err != nil { + return err + } + + // For WAIT_FOR_EXECUTED_GTID_SET(), the return value is the state of the query, where + // 0 represents success, and 1 represents timeout. Any other failures generate an error. + if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { + return errors.New("invalid results") + } + val := result.Rows[0][0] + state, err := val.ToInt64() + if err != nil { + return fmt.Errorf("invalid result of %v", val) + } + switch state { + case 0: + return nil + case 1: + return fmt.Errorf("timed out waiting for position %v", pos) + default: + return fmt.Errorf("invalid result of %v", state) + } } // readBinlogEvent is part of the Flavor interface. diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 23b19669f16..5dcbcccbd6c 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -315,7 +315,8 @@ func (mysqld *Mysqld) SetSuperReadOnly(on bool) (ResetSuperReadOnlyFunc, error) return resetFunc, nil } -// WaitSourcePos lets replicas wait to given replication position +// WaitSourcePos lets replicas wait for the given replication position to +// be reached. func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.Position) error { // Get a connection. conn, err := getPoolReconnect(ctx, mysqld.dbaPool) @@ -324,14 +325,12 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P } defer conn.Recycle() - // First check if filePos flavored Position was passed in. If so, we can't defer to the flavor in the connection, - // unless that flavor is also filePos. - waitCommandName := "WaitUntilPositionCommand" - var query string + // First check if filePos flavored Position was passed in. If so, we + // can't defer to the flavor in the connection, unless that flavor is + // also filePos. if targetPos.MatchesFlavor(replication.FilePosFlavorID) { - // If we are the primary, WaitUntilFilePositionCommand will fail. - // But position is most likely reached. So, check the position - // first. + // If we are the primary, WaitUntilFilePosition will fail. But + // position is most likely reached. So, check the position first. mpos, err := conn.Conn.PrimaryFilePosition() if err != nil { return fmt.Errorf("WaitSourcePos: PrimaryFilePosition failed: %v", err) @@ -339,17 +338,9 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P if mpos.AtLeast(targetPos) { return nil } - - // Find the query to run, run it. - query, err = conn.Conn.WaitUntilFilePositionCommand(ctx, targetPos) - if err != nil { - return err - } - waitCommandName = "WaitUntilFilePositionCommand" } else { - // If we are the primary, WaitUntilPositionCommand will fail. - // But position is most likely reached. So, check the position - // first. + // If we are the primary, WaitUntilPosition will fail. But + // position is most likely reached. So, check the position first. mpos, err := conn.Conn.PrimaryPosition() if err != nil { return fmt.Errorf("WaitSourcePos: PrimaryPosition failed: %v", err) @@ -357,28 +348,10 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P if mpos.AtLeast(targetPos) { return nil } - - // Find the query to run, run it. - query, err = conn.Conn.WaitUntilPositionCommand(ctx, targetPos) - if err != nil { - return err - } } - qr, err := mysqld.FetchSuperQuery(ctx, query) - if err != nil { - return fmt.Errorf("%v(%v) failed: %v", waitCommandName, query, err) - } - - if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return fmt.Errorf("unexpected result format from %v(%v): %#v", waitCommandName, query, qr) - } - result := qr.Rows[0][0] - if result.IsNull() { - return fmt.Errorf("%v(%v) failed: replication is probably stopped", waitCommandName, query) - } - if result.ToString() == "-1" { - return fmt.Errorf("timed out waiting for position %v", targetPos) + if err := conn.Conn.WaitUntilPosition(ctx, targetPos); err != nil { + return fmt.Errorf("WaitSourcePos failed: %v", err) } return nil } From 4581f36ac3b8dc419b59c47be03e5653e51f0337 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 10 Dec 2023 16:24:58 -0500 Subject: [PATCH 2/2] Improve error handling Signed-off-by: Matt Lord --- go/mysql/flavor.go | 3 ++- go/mysql/flavor_filepos.go | 15 +++++++-------- go/mysql/flavor_mariadb.go | 16 ++++++++-------- go/mysql/flavor_mysql.go | 20 +++++++++----------- go/vt/mysqlctl/replication.go | 7 ++++--- 5 files changed, 30 insertions(+), 31 deletions(-) diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 09fae3e7c2f..2c6fb2b867f 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -147,7 +147,8 @@ type flavor interface { primaryStatus(c *Conn) (replication.PrimaryStatus, error) // waitUntilPosition waits until the given position is reached or - // until the context expires and returns an error if we did not succeed. + // until the context expires. It returns an error if we did not + // succeed. waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error baseShowTables() string diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 882e27ea092..96939faf3c4 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -18,7 +18,6 @@ package mysql import ( "context" - "errors" "fmt" "io" "strings" @@ -271,14 +270,14 @@ func (flv *filePosFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, err func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error { filePosPos, ok := pos.GTIDSet.(replication.FilePosGTID) if !ok { - return fmt.Errorf("position is not filePos compatible: %#v", pos.GTIDSet) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "position is not filePos compatible: %#v", pos.GTIDSet) } query := fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d)", filePosPos.File, filePosPos.Pos) if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return fmt.Errorf("timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) } query = fmt.Sprintf("SELECT MASTER_POS_WAIT('%s', %d, %.6f)", filePosPos.File, filePosPos.Pos, timeout.Seconds()) } @@ -297,23 +296,23 @@ func (flv *filePosFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos re // is waiting, the function returns NULL. If the replica is past the // specified position, the function returns immediately. if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { - return errors.New("invalid results") + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result) } val := result.Rows[0][0] if val.IsNull() { - return errors.New("replication is not running") + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "replication is not running") } state, err := val.ToInt64() if err != nil { - return fmt.Errorf("invalid result of %v", val) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val) } switch { case state == -1: - return fmt.Errorf("timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) case state >= 0: return nil default: - return fmt.Errorf("invalid result of %v", state) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state) } } diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 37f1d7c924f..77b1b4f1399 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -19,15 +19,15 @@ package mysql import ( "context" - "errors" "fmt" "io" "time" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // mariadbFlavor implements the Flavor interface for MariaDB. @@ -49,7 +49,7 @@ func (mariadbFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { return nil, err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_binlog_pos: %#v", qr) } return replication.ParseMariadbGTIDSet(qr.Rows[0][0].ToString()) @@ -235,7 +235,7 @@ func (mariadbFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replica if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { - return vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) } query = fmt.Sprintf("SELECT MASTER_GTID_WAIT('%s', %.6f)", pos, timeout.Seconds()) } @@ -248,20 +248,20 @@ func (mariadbFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replica // For MASTER_GTID_WAIT(), if the wait completes without a timeout 0 is // returned and -1 if there was a timeout. if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { - return errors.New("invalid results") + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result) } val := result.Rows[0][0] state, err := val.ToInt64() if err != nil { - return fmt.Errorf("invalid result of %v", val) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val) } switch state { case 0: return nil case -1: - return fmt.Errorf("timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) default: - return fmt.Errorf("invalid result of %v", state) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state) } } diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 8c5e06a719f..a3a449f5490 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -18,14 +18,12 @@ package mysql import ( "context" - "errors" "fmt" "io" "time" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -55,7 +53,7 @@ func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { return nil, err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_executed: %#v", qr) } return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString()) } @@ -68,7 +66,7 @@ func (mysqlFlavor) purgedGTIDSet(c *Conn) (replication.GTIDSet, error) { return nil, err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_purged: %#v", qr) } return replication.ParseMysql56GTIDSet(qr.Rows[0][0].ToString()) } @@ -81,7 +79,7 @@ func (mysqlFlavor) serverUUID(c *Conn) (string, error) { return "", err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr) + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for server_uuid: %#v", qr) } return qr.Rows[0][0].ToString(), nil } @@ -93,7 +91,7 @@ func (mysqlFlavor) gtidMode(c *Conn) (string, error) { return "", err } if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return "", vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr) + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for gtid_mode: %#v", qr) } return qr.Rows[0][0].ToString(), nil } @@ -138,7 +136,7 @@ func (mysqlFlavor) startSQLThreadCommand() string { func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error { gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet) if !ok { - return vterrors.Errorf(vtrpc.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) } // Build the command. @@ -246,20 +244,20 @@ func (mysqlFlavor) waitUntilPosition(ctx context.Context, c *Conn, pos replicati // For WAIT_FOR_EXECUTED_GTID_SET(), the return value is the state of the query, where // 0 represents success, and 1 represents timeout. Any other failures generate an error. if len(result.Rows) != 1 || len(result.Rows[0]) != 1 { - return errors.New("invalid results") + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid results: %#v", result) } val := result.Rows[0][0] state, err := val.ToInt64() if err != nil { - return fmt.Errorf("invalid result of %v", val) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %#v", val) } switch state { case 0: return nil case 1: - return fmt.Errorf("timed out waiting for position %v", pos) + return vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "timed out waiting for position %v", pos) default: - return fmt.Errorf("invalid result of %v", state) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid result of %d", state) } } diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 5dcbcccbd6c..1dd03d901cb 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/hook" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vterrors" ) type ResetSuperReadOnlyFunc func() error @@ -333,7 +334,7 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P // position is most likely reached. So, check the position first. mpos, err := conn.Conn.PrimaryFilePosition() if err != nil { - return fmt.Errorf("WaitSourcePos: PrimaryFilePosition failed: %v", err) + return vterrors.Wrapf(err, "WaitSourcePos: PrimaryFilePosition failed") } if mpos.AtLeast(targetPos) { return nil @@ -343,7 +344,7 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P // position is most likely reached. So, check the position first. mpos, err := conn.Conn.PrimaryPosition() if err != nil { - return fmt.Errorf("WaitSourcePos: PrimaryPosition failed: %v", err) + return vterrors.Wrapf(err, "WaitSourcePos: PrimaryPosition failed") } if mpos.AtLeast(targetPos) { return nil @@ -351,7 +352,7 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P } if err := conn.Conn.WaitUntilPosition(ctx, targetPos); err != nil { - return fmt.Errorf("WaitSourcePos failed: %v", err) + return vterrors.Wrapf(err, "WaitSourcePos failed") } return nil }