Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

tracker: get some session variables from downtream (#1032) #1071

Merged
merged 1 commit into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ ErrSchemaTrackerCannotParseDownstreamTable,[code=44008:class=schema-tracker:scop
ErrSchemaTrackerInvalidCreateTableStmt,[code=44009:class=schema-tracker:scope=internal:level=medium], "Message: %s is not a valid `CREATE TABLE` statement"
ErrSchemaTrackerRestoreStmtFail,[code=44010:class=schema-tracker:scope=internal:level=medium], "Message: fail to restore the statement"
ErrSchemaTrackerCannotDropTable,[code=44011:class=schema-tracker:scope=internal:level=high], "Message: failed to drop table for `%s`.`%s` in schema tracker"
ErrSchemaTrackerInit,[code=44012:class=schema-tracker:scope=internal:level=high], "Message: failed to create schema tracker"
ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high], "Message: the scheduler has not started"
ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium], "Message: the scheduler has already started"
ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium], "Message: dm-worker with name %s already exists"
Expand Down
13 changes: 5 additions & 8 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig {

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
// deprecated, mysql driver could automatically fetch this value
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`
Session map[string]string `toml:"session" json:"session" yaml:"session"`

Expand Down Expand Up @@ -121,10 +122,6 @@ func (db *DBConfig) Decode(data string) error {

// Adjust adjusts the config.
func (db *DBConfig) Adjust() {
if db.MaxAllowedPacket == nil {
cloneV := defaultMaxAllowedPacket
db.MaxAllowedPacket = &cloneV
}
}

// SubTaskConfig is the configuration for SubTask
Expand Down
2 changes: 2 additions & 0 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
case stage, ok := <-stageCh:
if !ok {
closed = true
break
}
opType, err := w.operateSubTaskStageWithoutConfig(stage)
if err != nil {
Expand All @@ -422,6 +423,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
case err, ok := <-errCh:
if !ok {
closed = true
break
}
// TODO: deal with err
log.L().Error("WatchSubTaskStage received an error", zap.Error(err))
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-schema-tracker-44012]
message = "failed to create schema tracker"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-scheduler-46001]
message = "the scheduler has not started"
description = ""
Expand Down
4 changes: 2 additions & 2 deletions loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
printStatusInterval = time.Second * 5
)

// Status implements SubTaskUnit.Status
// Status implements Unit.Status
func (l *Loader) Status() interface{} {
finishedSize := l.finishedDataSize.Get()
totalSize := l.totalDataSize.Get()
Expand All @@ -41,7 +41,7 @@ func (l *Loader) Status() interface{} {
return s
}

// Error implements SubTaskUnit.Error
// Error implements Unit.Error
func (l *Loader) Error() interface{} {
return &pb.LoadError{}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ var mockDB sqlmock.Sqlmock

// Apply will build BaseDB with DBConfig
func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=%d",
config.User, config.Password, config.Host, config.Port, *config.MaxAllowedPacket)
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, config.Host, config.Port)

doFuncInClose := func() {}
if config.Security != nil && len(config.Security.SSLCA) != 0 &&
Expand Down
1 change: 1 addition & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
"unsupported drop integer primary key",
"Unsupported collation",
"Invalid default value for",
"Unsupported drop primary key",
}

// UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery
Expand Down
43 changes: 41 additions & 2 deletions pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,67 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
tidbConfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"

"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
)

const (
waitDDLRetryCount = 10
schemaLeaseTime = 10 * time.Millisecond
)

var (
sessionVars = []string{"sql_mode", "tidb_skip_utf8_check"}
)

// Tracker is used to track schema locally.
type Tracker struct {
store kv.Storage
dom *domain.Domain
se session.Session
}

// NewTracker creates a new tracker.
func NewTracker(sessionCfg map[string]string) (*Tracker, error) {
// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// some variable from downstream TiDB using `tidbConn`.
func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker, error) {
// NOTE: tidb uses a **global** config so can't isolate tracker's config from each other. If that isolation is needed,
// we might SetGlobalConfig before every call to tracker, or use some patch like https://github.com/bouk/monkey
toSet := tidbConfig.NewConfig()
toSet.AlterPrimaryKey = true
tidbConfig.StoreGlobalConfig(toSet)

if len(sessionCfg) == 0 {
sessionCfg = make(map[string]string)
var ignoredColumn interface{}
for _, k := range sessionVars {
rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k))
if err2 != nil {
return nil, err2
}
if rows.Next() {
var value string
if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil {
return nil, err3
}
sessionCfg[k] = value
}
if err2 = rows.Close(); err2 != nil {
return nil, err2
}
if err2 = rows.Err(); err2 != nil {
return nil, err2
}
}
}

store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.MockTiKV))
if err != nil {
return nil, err
Expand Down
103 changes: 86 additions & 17 deletions pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schema_test
package schema

import (
"context"
"database/sql"
"encoding/json"
"sort"
"testing"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/schema"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap/zapcore"

"github.com/pingcap/dm/pkg/conn"
)

func Test(t *testing.T) {
Expand All @@ -32,17 +35,78 @@ func Test(t *testing.T) {

var _ = Suite(&trackerSuite{})

type trackerSuite struct{}
var (
defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"}
)

type trackerSuite struct {
baseConn *conn.BaseConn
db *sql.DB
backupKeys []string
}

func (s *trackerSuite) SetUpSuite(c *C) {
s.backupKeys = sessionVars
sessionVars = []string{"sql_mode"}
db, _, err := sqlmock.New()
s.db = db
c.Assert(err, IsNil)
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
s.baseConn = conn.NewBaseConn(con, nil)
}

func (s *trackerSuite) TearDownSuite(c *C) {
s.db.Close()
sessionVars = s.backupKeys
}

func (s *trackerSuite) TestSessionCfg(c *C) {
func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
log.SetLevel(zapcore.ErrorLevel)

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)

// user give correct session config
_, err = NewTracker(defaultTestSessionCfg, baseConn)
c.Assert(err, IsNil)

// user give wrong session session, will return error
sessionCfg := map[string]string{"sql_mode": "HaHa"}
tracker, err := schema.NewTracker(sessionCfg)
_, err = NewTracker(sessionCfg, baseConn)
c.Assert(err, NotNil)

tracker, err = schema.NewTracker(nil)
// discover session config failed, will return error
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "HaHa"))
_, err = NewTracker(nil, baseConn)
c.Assert(err, NotNil)

// empty or default config in downstream
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", ""))
tracker, err := NewTracker(nil, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
err = tracker.Exec(context.Background(), "", "create database testdb;")
c.Assert(err, IsNil)

// found session config in downstream
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE"))
tracker, err = NewTracker(nil, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasStrictMode(), IsTrue)

ctx := context.Background()
err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)
Expand All @@ -51,11 +115,12 @@ func (s *trackerSuite) TestSessionCfg(c *C) {
err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00')")
c.Assert(err, NotNil)

// set session config
// user set session config, get tracker config from downstream
// no `STRICT_TRANS_TABLES`, no error now
sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"}
tracker, err = schema.NewTracker(sessionCfg)
tracker, err = NewTracker(sessionCfg, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)
Expand All @@ -75,30 +140,34 @@ func (s *trackerSuite) TestSessionCfg(c *C) {
cts, err = tracker.GetCreateTable(context.Background(), "testdb", "foo")
c.Assert(err, IsNil)
c.Assert(cts, Equals, "CREATE TABLE \"foo\" ( \"a\" varchar(255) NOT NULL, PRIMARY KEY (\"a\")) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")

// test alter primary key
err = tracker.Exec(ctx, "testdb", "alter table \"foo\" drop primary key")
c.Assert(err, IsNil)
}

func (s *trackerSuite) TestDDL(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// Table shouldn't exist before initialization.
_, err = tracker.GetTable("testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

_, err = tracker.GetCreateTable(context.Background(), "testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

ctx := context.Background()
err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)

_, err = tracker.GetTable("testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

// Now create the table with 3 columns.
err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b varchar(255) as (concat(a, a)), c int)")
Expand Down Expand Up @@ -147,7 +216,7 @@ func (s *trackerSuite) TestDDL(c *C) {
func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down Expand Up @@ -186,7 +255,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// We cannot create a table without a database.
Expand Down Expand Up @@ -214,7 +283,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
func (s *trackerSuite) TestMultiDrop(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down Expand Up @@ -258,7 +327,7 @@ func (aj asJSON) String() string {
func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// Create some sort of complicated table.
Expand Down Expand Up @@ -322,7 +391,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) {
log.SetLevel(zapcore.ErrorLevel)
ctx := context.Background()

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// nothing should exist...
Expand Down
Loading