Skip to content

Commit

Permalink
tracker: get some session variables from downtream (pingcap#1032)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Sep 21, 2020
1 parent dd6a4b7 commit cdd4c8e
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 46 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ ErrSchemaTrackerCannotParseDownstreamTable,[code=44008:class=schema-tracker:scop
ErrSchemaTrackerInvalidCreateTableStmt,[code=44009:class=schema-tracker:scope=internal:level=medium], "Message: %s is not a valid `CREATE TABLE` statement"
ErrSchemaTrackerRestoreStmtFail,[code=44010:class=schema-tracker:scope=internal:level=medium], "Message: fail to restore the statement"
ErrSchemaTrackerCannotDropTable,[code=44011:class=schema-tracker:scope=internal:level=high], "Message: failed to drop table for `%s`.`%s` in schema tracker"
ErrSchemaTrackerInit,[code=44012:class=schema-tracker:scope=internal:level=high], "Message: failed to create schema tracker"
ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high], "Message: the scheduler has not started"
ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium], "Message: the scheduler has already started"
ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium], "Message: dm-worker with name %s already exists"
Expand Down
13 changes: 5 additions & 8 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig {

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
// deprecated, mysql driver could automatically fetch this value
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`
Session map[string]string `toml:"session" json:"session" yaml:"session"`

Expand Down Expand Up @@ -121,10 +122,6 @@ func (db *DBConfig) Decode(data string) error {

// Adjust adjusts the config.
func (db *DBConfig) Adjust() {
if db.MaxAllowedPacket == nil {
cloneV := defaultMaxAllowedPacket
db.MaxAllowedPacket = &cloneV
}
}

// SubTaskConfig is the configuration for SubTask
Expand Down
2 changes: 2 additions & 0 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
case stage, ok := <-stageCh:
if !ok {
closed = true
break
}
opType, err := w.operateSubTaskStageWithoutConfig(stage)
if err != nil {
Expand All @@ -422,6 +423,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
case err, ok := <-errCh:
if !ok {
closed = true
break
}
// TODO: deal with err
log.L().Error("WatchSubTaskStage received an error", zap.Error(err))
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-schema-tracker-44012]
message = "failed to create schema tracker"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-scheduler-46001]
message = "the scheduler has not started"
description = ""
Expand Down
4 changes: 2 additions & 2 deletions loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
printStatusInterval = time.Second * 5
)

// Status implements SubTaskUnit.Status
// Status implements Unit.Status
func (l *Loader) Status() interface{} {
finishedSize := l.finishedDataSize.Get()
totalSize := l.totalDataSize.Get()
Expand All @@ -41,7 +41,7 @@ func (l *Loader) Status() interface{} {
return s
}

// Error implements SubTaskUnit.Error
// Error implements Unit.Error
func (l *Loader) Error() interface{} {
return &pb.LoadError{}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ var mockDB sqlmock.Sqlmock

// Apply will build BaseDB with DBConfig
func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=%d",
config.User, config.Password, config.Host, config.Port, *config.MaxAllowedPacket)
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, config.Host, config.Port)

doFuncInClose := func() {}
if config.Security != nil && len(config.Security.SSLCA) != 0 &&
Expand Down
1 change: 1 addition & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
"unsupported drop integer primary key",
"Unsupported collation",
"Invalid default value for",
"Unsupported drop primary key",
}

// UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery
Expand Down
43 changes: 41 additions & 2 deletions pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,67 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
tidbConfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"

"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
)

const (
waitDDLRetryCount = 10
schemaLeaseTime = 10 * time.Millisecond
)

var (
sessionVars = []string{"sql_mode", "tidb_skip_utf8_check"}
)

// Tracker is used to track schema locally.
type Tracker struct {
store kv.Storage
dom *domain.Domain
se session.Session
}

// NewTracker creates a new tracker.
func NewTracker(sessionCfg map[string]string) (*Tracker, error) {
// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// some variable from downstream TiDB using `tidbConn`.
func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker, error) {
// NOTE: tidb uses a **global** config so can't isolate tracker's config from each other. If that isolation is needed,
// we might SetGlobalConfig before every call to tracker, or use some patch like https://github.com/bouk/monkey
toSet := tidbConfig.NewConfig()
toSet.AlterPrimaryKey = true
tidbConfig.StoreGlobalConfig(toSet)

if len(sessionCfg) == 0 {
sessionCfg = make(map[string]string)
var ignoredColumn interface{}
for _, k := range sessionVars {
rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k))
if err2 != nil {
return nil, err2
}
if rows.Next() {
var value string
if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil {
return nil, err3
}
sessionCfg[k] = value
}
if err2 = rows.Close(); err2 != nil {
return nil, err2
}
if err2 = rows.Err(); err2 != nil {
return nil, err2
}
}
}

store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.MockTiKV))
if err != nil {
return nil, err
Expand Down
103 changes: 86 additions & 17 deletions pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schema_test
package schema

import (
"context"
"database/sql"
"encoding/json"
"sort"
"testing"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/schema"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap/zapcore"

"github.com/pingcap/dm/pkg/conn"
)

func Test(t *testing.T) {
Expand All @@ -32,17 +35,78 @@ func Test(t *testing.T) {

var _ = Suite(&trackerSuite{})

type trackerSuite struct{}
var (
defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"}
)

type trackerSuite struct {
baseConn *conn.BaseConn
db *sql.DB
backupKeys []string
}

func (s *trackerSuite) SetUpSuite(c *C) {
s.backupKeys = sessionVars
sessionVars = []string{"sql_mode"}
db, _, err := sqlmock.New()
s.db = db
c.Assert(err, IsNil)
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
s.baseConn = conn.NewBaseConn(con, nil)
}

func (s *trackerSuite) TearDownSuite(c *C) {
s.db.Close()
sessionVars = s.backupKeys
}

func (s *trackerSuite) TestSessionCfg(c *C) {
func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
log.SetLevel(zapcore.ErrorLevel)

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)

// user give correct session config
_, err = NewTracker(defaultTestSessionCfg, baseConn)
c.Assert(err, IsNil)

// user give wrong session session, will return error
sessionCfg := map[string]string{"sql_mode": "HaHa"}
tracker, err := schema.NewTracker(sessionCfg)
_, err = NewTracker(sessionCfg, baseConn)
c.Assert(err, NotNil)

tracker, err = schema.NewTracker(nil)
// discover session config failed, will return error
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "HaHa"))
_, err = NewTracker(nil, baseConn)
c.Assert(err, NotNil)

// empty or default config in downstream
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", ""))
tracker, err := NewTracker(nil, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
err = tracker.Exec(context.Background(), "", "create database testdb;")
c.Assert(err, IsNil)

// found session config in downstream
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE"))
tracker, err = NewTracker(nil, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasStrictMode(), IsTrue)

ctx := context.Background()
err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)
Expand All @@ -51,11 +115,12 @@ func (s *trackerSuite) TestSessionCfg(c *C) {
err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00')")
c.Assert(err, NotNil)

// set session config
// user set session config, get tracker config from downstream
// no `STRICT_TRANS_TABLES`, no error now
sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"}
tracker, err = schema.NewTracker(sessionCfg)
tracker, err = NewTracker(sessionCfg, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)
Expand All @@ -75,30 +140,34 @@ func (s *trackerSuite) TestSessionCfg(c *C) {
cts, err = tracker.GetCreateTable(context.Background(), "testdb", "foo")
c.Assert(err, IsNil)
c.Assert(cts, Equals, "CREATE TABLE \"foo\" ( \"a\" varchar(255) NOT NULL, PRIMARY KEY (\"a\")) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")

// test alter primary key
err = tracker.Exec(ctx, "testdb", "alter table \"foo\" drop primary key")
c.Assert(err, IsNil)
}

func (s *trackerSuite) TestDDL(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// Table shouldn't exist before initialization.
_, err = tracker.GetTable("testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

_, err = tracker.GetCreateTable(context.Background(), "testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

ctx := context.Background()
err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)

_, err = tracker.GetTable("testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

// Now create the table with 3 columns.
err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b varchar(255) as (concat(a, a)), c int)")
Expand Down Expand Up @@ -147,7 +216,7 @@ func (s *trackerSuite) TestDDL(c *C) {
func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down Expand Up @@ -186,7 +255,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// We cannot create a table without a database.
Expand Down Expand Up @@ -214,7 +283,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
func (s *trackerSuite) TestMultiDrop(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down Expand Up @@ -258,7 +327,7 @@ func (aj asJSON) String() string {
func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// Create some sort of complicated table.
Expand Down Expand Up @@ -322,7 +391,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) {
log.SetLevel(zapcore.ErrorLevel)
ctx := context.Background()

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// nothing should exist...
Expand Down
Loading

0 comments on commit cdd4c8e

Please sign in to comment.