diff --git a/.travis.yml b/.travis.yml index d097347ed..29b9a633c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -70,7 +70,7 @@ before_install: script: - rm -f go.sum # - docker ps - - travis_wait make dev upload-coverage + - travis_wait 30 make dev upload-coverage after_failure: - netstat -nltp diff --git a/Makefile b/Makefile index 35108e799..de10bfd95 100644 --- a/Makefile +++ b/Makefile @@ -159,7 +159,7 @@ ifeq ("$(TRAVIS_COVERAGE)", "1") else @echo "Running in native mode." @export log_level=error; \ - $(GOTEST) -timeout 20m -ldflags '$(TEST_LDFLAGS)' -cover $(PACKAGES) || { $(GOFAIL_DISABLE); exit 1; } + $(GOTEST) -timeout 30m -ldflags '$(TEST_LDFLAGS)' -cover $(PACKAGES) || { $(GOFAIL_DISABLE); exit 1; } endif @$(GOFAIL_DISABLE) @@ -167,7 +167,7 @@ race: parserlib $(GO) get github.com/etcd-io/gofail@v0.0.0-20180808172546-51ce9a71510a @$(GOFAIL_ENABLE) @export log_level=debug; \ - $(GOTEST) -timeout 20m -race $(PACKAGES) || { $(GOFAIL_DISABLE); exit 1; } + $(GOTEST) -timeout 30m -race $(PACKAGES) || { $(GOFAIL_DISABLE); exit 1; } @$(GOFAIL_DISABLE) leak: parserlib diff --git a/session/inception_result.go b/session/inception_result.go index 240d5e4ef..0a14f103b 100644 --- a/session/inception_result.go +++ b/session/inception_result.go @@ -80,6 +80,7 @@ type Record struct { DBName string TableName string TableInfo *TableInfo + // ddl回滚 DDLRollback string OPID string @@ -88,6 +89,11 @@ type Record struct { // 是否开启OSC useOsc bool + + // update多表时,记录多余的表 + // update多表时,默认set第一列的表为主表,其余表才会记录到该处 + // 仅在发现多表操作时,初始化该参数 + MultiTables map[string]*TableInfo } type recordSet struct { diff --git a/session/parser.go b/session/parser.go index a681c6eda..748f9d9bc 100644 --- a/session/parser.go +++ b/session/parser.go @@ -111,7 +111,13 @@ func (s *session) GetNextBackupRecord() *Record { if r.StageStatus != StatusExecFail { r.StageStatus = StatusBackupFail } + // 清理已删除的列 clearDeleteColumns(r.TableInfo) + if r.MultiTables != nil { + for _, t := range r.MultiTables { + clearDeleteColumns(t) + } + } return r } @@ -146,6 +152,7 @@ func configPrimaryKey(t *TableInfo) { } } +// clearDeleteColumns 清理已删除的列,方便解析binlog func clearDeleteColumns(t *TableInfo) { if t == nil || t.IsClear { return @@ -291,7 +298,9 @@ func (s *session) Parser(ctx context.Context) { if s.checkFilter(event, record, currentThreadID) { changeRows += len(event.Rows) _, err = s.generateDeleteSql(record.TableInfo, event, e) - s.checkError(err) + if err != nil { + log.Error(err) + } } else { goto ENDCHECK } @@ -303,7 +312,9 @@ func (s *session) Parser(ctx context.Context) { if s.checkFilter(event, record, currentThreadID) { changeRows += len(event.Rows) _, err = s.generateInsertSql(record.TableInfo, event, e) - s.checkError(err) + if err != nil { + log.Error(err) + } } else { goto ENDCHECK } @@ -311,10 +322,16 @@ func (s *session) Parser(ctx context.Context) { case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: if event, ok := e.Event.(*replication.RowsEvent); ok { - if s.checkFilter(event, record, currentThreadID) { + if ok, t := s.checkUpdateFilter(event, record, currentThreadID); ok { changeRows += len(event.Rows) / 2 - _, err = s.generateUpdateSql(record.TableInfo, event, e) - s.checkError(err) + if t != nil { + _, err = s.generateUpdateSql(t, event, e) + } else { + _, err = s.generateUpdateSql(record.TableInfo, event, e) + } + if err != nil { + log.Error(err) + } } else { goto ENDCHECK } @@ -404,6 +421,47 @@ func (s *session) checkFilter(event *replication.RowsEvent, return true } +// checkUpdateFilter 检查update的筛选条件 +// update会涉及多表更新问题,所以需要把匹配到的表返回 +func (s *session) checkUpdateFilter(event *replication.RowsEvent, + record *Record, currentThreadID uint32) (bool, *TableInfo) { + var multiTable *TableInfo + if record.MultiTables == nil { + if !strings.EqualFold(string(event.Table.Schema), record.TableInfo.Schema) || + !strings.EqualFold(string(event.Table.Table), record.TableInfo.Name) { + return false, nil + } + } else { + found := false + if strings.EqualFold(string(event.Table.Schema), record.TableInfo.Schema) && + strings.EqualFold(string(event.Table.Table), record.TableInfo.Name) { + found = true + } else { + for _, t := range record.MultiTables { + if strings.EqualFold(string(event.Table.Schema), t.Schema) && + strings.EqualFold(string(event.Table.Table), t.Name) { + multiTable = t + found = true + break + } + } + } + if !found { + return false, nil + } + } + + if currentThreadID == 0 && s.DBType == DBTypeMariaDB { + if record.ErrLevel != 1 { + record.AppendErrorNo(ErrNotFoundThreadId, s.DBVersion) + } + return true, multiTable + } else if record.ThreadId != currentThreadID { + return false, nil + } + return true, multiTable +} + // 解析的sql写入缓存,并定期入库 func (s *session) myWrite(b []byte, binEvent *replication.BinlogEvent, opid string, table string, record *Record) { @@ -517,7 +575,9 @@ func (s *session) generateInsertSql(t *TableInfo, e *replication.RowsEvent, } r, err := InterpolateParams(sql, vv, s.Inc.HexBlob) - s.checkError(err) + if err != nil { + log.Error(err) + } s.write(r, binEvent) } @@ -580,7 +640,9 @@ func (s *session) generateDeleteSql(t *TableInfo, e *replication.RowsEvent, newSql := strings.Join([]string{sql, strings.Join(columnNames, " AND")}, "") r, err := InterpolateParams(newSql, vv, s.Inc.HexBlob) - s.checkError(err) + if err != nil { + log.Error(err) + } s.write(r, binEvent) @@ -760,7 +822,9 @@ func (s *session) generateUpdateSql(t *TableInfo, e *replication.RowsEvent, newSql = strings.Join([]string{sql, strings.Join(columnNames, " AND")}, "") newValues = append(newValues, oldValues...) r, err := InterpolateParams(newSql, newValues, s.Inc.HexBlob) - s.checkError(err) + if err != nil { + log.Error(err) + } s.write(r, binEvent) diff --git a/session/session_inception.go b/session/session_inception.go index 57b7a68ad..ffbd38d78 100644 --- a/session/session_inception.go +++ b/session/session_inception.go @@ -6686,14 +6686,20 @@ func (s *session) checkChangeDB(node *ast.UseStmt, sql string) { s.DBName = node.DBName // 新建库跳过use 切换 - if s.checkDBExists(node.DBName, true) && !s.dbCacheList[strings.ToLower(node.DBName)].IsNew { - _, err := s.Exec(fmt.Sprintf("USE `%s`", node.DBName), true) - if err != nil { - log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err) - if myErr, ok := err.(*mysqlDriver.MySQLError); ok { - s.AppendErrorMessage(myErr.Message) - } else { - s.AppendErrorMessage(err.Error()) + if s.checkDBExists(node.DBName, true) { + key := node.DBName + if s.IgnoreCase() { + key = strings.ToLower(key) + } + if v, ok := s.dbCacheList[key]; ok && !v.IsNew { + _, err := s.Exec(fmt.Sprintf("USE `%s`", node.DBName), true) + if err != nil { + log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err) + if myErr, ok := err.(*mysqlDriver.MySQLError); ok { + s.AppendErrorMessage(myErr.Message) + } else { + s.AppendErrorMessage(err.Error()) + } } } } @@ -6987,6 +6993,7 @@ func (s *session) checkUpdate(node *ast.UpdateStmt, sql string) { } } } + } } @@ -7068,7 +7075,28 @@ func (s *session) checkUpdate(node *ast.UpdateStmt, sql string) { l.Column.Table = model.NewCIStr(s.myRecord.TableInfo.Name) } - s.checkFieldItem(l.Column, tableInfoList) + if s.checkFieldItem(l.Column, tableInfoList) { + + // update多表操作 + // set不同的表 + // 存储其他表到MultiTables对象 + if len(tableInfoList) > 1 { + if t := getFieldWithTableInfo(l.Column, tableInfoList); t != nil { + if !strings.EqualFold(t.Schema, s.myRecord.TableInfo.Schema) || + !strings.EqualFold(t.Name, s.myRecord.TableInfo.Name) { + key := fmt.Sprintf("%s.%s", t.Schema, t.Name) + key = strings.ToLower(key) + + if s.myRecord.MultiTables == nil { + s.myRecord.MultiTables = make(map[string]*TableInfo, 0) + s.myRecord.MultiTables[key] = t + } else if _, ok := s.myRecord.MultiTables[key]; !ok { + s.myRecord.MultiTables[key] = t + } + } + } + } + } // 多表update情况时,下面的判断会有问题 // found := false @@ -7320,6 +7348,29 @@ func (s *session) checkFieldItem(name *ast.ColumnName, tables []*TableInfo) bool } } +// getFieldWithTableInfo 获取字段对应的表信息 +func getFieldWithTableInfo(name *ast.ColumnName, tables []*TableInfo) *TableInfo { + db := name.Schema.L + for _, t := range tables { + var tName string + if t.AsName != "" { + tName = t.AsName + } else { + tName = t.Name + } + if name.Table.L != "" && (db == "" || strings.EqualFold(t.Schema, db)) && + (strings.EqualFold(tName, name.Table.L)) || + name.Table.L == "" { + for _, field := range t.Fields { + if strings.EqualFold(field.Field, name.Name.L) && !field.IsDeleted { + return t + } + } + } + } + return nil +} + // getFieldItem 获取字段信息 func getFieldInfo(name *ast.ColumnName, tables []*TableInfo) (*FieldInfo, string) { db := name.Schema.L diff --git a/session/session_inception_backup_test.go b/session/session_inception_backup_test.go index 13bfae239..d5186f14b 100644 --- a/session/session_inception_backup_test.go +++ b/session/session_inception_backup_test.go @@ -431,19 +431,95 @@ func (s *testSessionIncBackupSuite) TestUpdate(c *C) { "UPDATE `test_inc`.`t1` SET `c1`=123456789012.1234 WHERE `id`=1;", Commentf("%v", res.Rows())) // -------------------- 多表update ------------------- + config.GetGlobalConfig().Inc.EnableMinimalRollback = false sql = `drop table if exists table1;drop table if exists table2; create table table1(id1 int primary key,c1 int,c2 int); create table table2(id2 int primary key,c1 int,c2 int,c22 int); insert into table1 values(1,1,1),(2,1,1); insert into table2 values(1,1,1,null),(2,1,null,null); update table1 t1,table2 t2 set t1.c1=10,t2.c22=20 where t1.id1=t2.id2 and t2.c1=1;` - // res = s.mustRunBackup(c, sql) - // row = res.Rows()[int(s.tk.Se.AffectedRows())-1] - // backup = s.query("table1", row[7].(string)) - // c.Assert(backup, Equals, "UPDATE `test_inc`.`t1` SET `c1`=123456789012.1234 WHERE `id`=1;", Commentf("%v", res.Rows())) res = s.mustRunBackup(c, sql) s.assertRows(c, res.Rows()[int(s.tk.Se.AffectedRows())-1:], + "UPDATE `test_inc`.`table2` SET `id2`=1, `c1`=1, `c2`=1, `c22`=NULL WHERE `id2`=1;", + "UPDATE `test_inc`.`table2` SET `id2`=2, `c1`=1, `c2`=NULL, `c22`=NULL WHERE `id2`=2;", + "UPDATE `test_inc`.`table1` SET `id1`=1, `c1`=1, `c2`=1 WHERE `id1`=1;", + "UPDATE `test_inc`.`table1` SET `id1`=2, `c1`=1, `c2`=1 WHERE `id1`=2;", + ) + + sql = `drop table if exists table1;drop table if exists table2; + create table table1(id1 int primary key,c1 int,c2 int); + create table table2(id2 int primary key,c1 int,c2 int,c22 int); + insert into table1 values(1,1,1),(2,1,1); + insert into table2 values(1,1,1,null),(2,1,null,null); + update table1 t1,table2 t2 set t2.c22=20,t1.c1=10 where t1.id1=t2.id2 and t2.c1=1;` + + res = s.mustRunBackup(c, sql) + s.assertRows(c, res.Rows()[int(s.tk.Se.AffectedRows())-1:], + "UPDATE `test_inc`.`table2` SET `id2`=1, `c1`=1, `c2`=1, `c22`=NULL WHERE `id2`=1;", + "UPDATE `test_inc`.`table2` SET `id2`=2, `c1`=1, `c2`=NULL, `c22`=NULL WHERE `id2`=2;", + "UPDATE `test_inc`.`table1` SET `id1`=1, `c1`=1, `c2`=1 WHERE `id1`=1;", + "UPDATE `test_inc`.`table1` SET `id1`=2, `c1`=1, `c2`=1 WHERE `id1`=2;", + ) + + sql = `drop table if exists table1;drop table if exists table2; + create table table1(id1 int primary key,c1 int,c2 int); + create table table2(id2 int primary key,c1 int,c2 int,c22 int); + insert into table1 values(1,1,1),(2,1,1); + insert into table2 values(1,1,1,null),(2,1,null,null); + update table1 t1,table2 t2 set c22=20,t1.c1=10 where t1.id1=t2.id2 and t2.c1=1;` + + res = s.mustRunBackup(c, sql) + s.assertRows(c, res.Rows()[int(s.tk.Se.AffectedRows())-1:], + "UPDATE `test_inc`.`table2` SET `id2`=1, `c1`=1, `c2`=1, `c22`=NULL WHERE `id2`=1;", + "UPDATE `test_inc`.`table2` SET `id2`=2, `c1`=1, `c2`=NULL, `c22`=NULL WHERE `id2`=2;", + "UPDATE `test_inc`.`table1` SET `id1`=1, `c1`=1, `c2`=1 WHERE `id1`=1;", + "UPDATE `test_inc`.`table1` SET `id1`=2, `c1`=1, `c2`=1 WHERE `id1`=2;", + ) + + config.GetGlobalConfig().Inc.EnableMinimalRollback = true + + sql = `drop table if exists table1;drop table if exists table2; + create table table1(id1 int primary key,c1 int,c2 int); + create table table2(id2 int primary key,c1 int,c2 int,c22 int); + insert into table1 values(1,1,1),(2,1,1); + insert into table2 values(1,1,1,null),(2,1,null,null); + update table1 t1,table2 t2 set t1.c1=10,t2.c22=20 where t1.id1=t2.id2 and t2.c1=1;` + + res = s.mustRunBackup(c, sql) + s.assertRows(c, res.Rows()[int(s.tk.Se.AffectedRows())-1:], + "UPDATE `test_inc`.`table2` SET `c22`=NULL WHERE `id2`=1;", + "UPDATE `test_inc`.`table2` SET `c22`=NULL WHERE `id2`=2;", + "UPDATE `test_inc`.`table1` SET `c1`=1 WHERE `id1`=1;", + "UPDATE `test_inc`.`table1` SET `c1`=1 WHERE `id1`=2;", + ) + + sql = `drop table if exists table1;drop table if exists table2; + create table table1(id1 int primary key,c1 int,c2 int); + create table table2(id2 int primary key,c1 int,c2 int,c22 int); + insert into table1 values(1,1,1),(2,1,1); + insert into table2 values(1,1,1,null),(2,1,null,null); + update table1 t1,table2 t2 set t2.c22=20,t1.c1=10 where t1.id1=t2.id2 and t2.c1=1;` + + res = s.mustRunBackup(c, sql) + s.assertRows(c, res.Rows()[int(s.tk.Se.AffectedRows())-1:], + "UPDATE `test_inc`.`table2` SET `c22`=NULL WHERE `id2`=1;", + "UPDATE `test_inc`.`table2` SET `c22`=NULL WHERE `id2`=2;", + "UPDATE `test_inc`.`table1` SET `c1`=1 WHERE `id1`=1;", + "UPDATE `test_inc`.`table1` SET `c1`=1 WHERE `id1`=2;", + ) + + sql = `drop table if exists table1;drop table if exists table2; + create table table1(id1 int primary key,c1 int,c2 int); + create table table2(id2 int primary key,c1 int,c2 int,c22 int); + insert into table1 values(1,1,1),(2,1,1); + insert into table2 values(1,1,1,null),(2,1,null,null); + update table1 t1,table2 t2 set c22=20,t1.c1=10 where t1.id1=t2.id2 and t2.c1=1;` + + res = s.mustRunBackup(c, sql) + s.assertRows(c, res.Rows()[int(s.tk.Se.AffectedRows())-1:], + "UPDATE `test_inc`.`table2` SET `c22`=NULL WHERE `id2`=1;", + "UPDATE `test_inc`.`table2` SET `c22`=NULL WHERE `id2`=2;", "UPDATE `test_inc`.`table1` SET `c1`=1 WHERE `id1`=1;", "UPDATE `test_inc`.`table1` SET `c1`=1 WHERE `id1`=2;", ) diff --git a/session/session_inception_common_test.go b/session/session_inception_common_test.go index 7e9befa75..0a707ba74 100644 --- a/session/session_inception_common_test.go +++ b/session/session_inception_common_test.go @@ -403,11 +403,11 @@ func (s *testCommon) assertRows(c *C, rows [][]interface{}, rollbackSqls ...stri for _, row := range rows { opid := "" backupDBName := "" - runSql := "" - if row[5] != nil { - runSql = row[5].(string) - } + // runSql := "" + // if row[5] != nil { + // runSql = row[5].(string) + // } affectedRows := 0 if row[6] != nil { @@ -446,7 +446,18 @@ func (s *testCommon) assertRows(c *C, rows [][]interface{}, rollbackSqls ...stri // } tableName := s.getObjectName(currentSql) - c.Assert(tableName, Not(Equals), "", Commentf("%v", currentSql)) + // 表名没有时,查询一下 + if tableName == "" { + sql := "select tablename from `%s`.`%s` where opid_time = ?" + sql = fmt.Sprintf(sql, backupDBName, s.remoteBackupTable) + rows, err := s.db.Raw(sql, opid).Rows() + c.Assert(err, IsNil) + for rows.Next() { + rows.Scan(&tableName) + } + rows.Close() + } + c.Assert(tableName, Not(Equals), "", Commentf("%v", row)) sql := "select rollback_statement from %s.`%s` where opid_time = ?;" sql = fmt.Sprintf(sql, backupDBName, tableName) @@ -464,13 +475,14 @@ func (s *testCommon) assertRows(c *C, rows [][]interface{}, rollbackSqls ...stri rows.Close() if affectedRows > 0 { - if strings.HasPrefix(runSql, "update") && - (strings.Contains(runSql, ",") || strings.Contains(runSql, "join")) { - // update多表时受影响行数为两表之和 - // 待实现... - } else { - c.Assert(affectedRows, Equals, len(result1), Commentf("%v", result1)) - } + // if strings.HasPrefix(runSql, "update") && + // (strings.Contains(runSql, ",") || strings.Contains(runSql, "join")) { + // // update多表时受影响行数为两表之和 + // // 待实现... + // } else { + // c.Assert(affectedRows, Equals, len(result1), Commentf("%v", result1)) + // } + c.Assert(affectedRows, Equals, len(result1), Commentf("%v", result1)) } result = append(result, result1...) @@ -541,6 +553,8 @@ func (s *testCommon) getObjectName(sql string) (name string) { name = tblName.Name.String() case *ast.UpdateStmt: + return "" + tblSrc := getLeftTable(node.TableRefs.TableRefs) if tblSrc == nil { log.Errorf("未找到表名!!! sql: %s", sql) @@ -552,6 +566,12 @@ func (s *testCommon) getObjectName(sql string) (name string) { return "" } + // for _, l := range node.List { + // originTable := l.Column.Table.L + // firstColumnName := l.Column.Name.O + + // } + name = tblName.Name.String() case *ast.DeleteStmt: tableRefs := node.TableRefs diff --git a/session/session_inception_tran_test.go b/session/session_inception_tran_test.go index 5c166bcc8..6fdf7bc84 100644 --- a/session/session_inception_tran_test.go +++ b/session/session_inception_tran_test.go @@ -15,7 +15,6 @@ package session_test import ( "fmt" - "strconv" "strings" "testing" @@ -24,8 +23,6 @@ import ( "github.com/hanchuanchuan/goInception/util/testkit" "github.com/jinzhu/gorm" . "github.com/pingcap/check" - - "github.com/hanchuanchuan/goInception/ast" ) var _ = Suite(&testSessionIncTranSuite{}) @@ -189,11 +186,9 @@ update t1 set c1='10' where id>0;`, 2) // 测试不同分批下的大量数据 s.insertMulti(c, 2) - s.insertMulti(c, 3) - s.insertMulti(c, 13) - s.insertMulti(c, 73) + s.insertMulti(c, 79) - // for i := 2; i <= 20; i++ { + // for i := 41; i <= 60; i++ { // s.insertMulti(c, i) // } } @@ -680,104 +675,6 @@ func (s *testSessionIncTranSuite) query(table, opid string) string { return strings.Join(result, "\n") } -func (s *testSessionIncTranSuite) assertRows(c *C, rows [][]interface{}, rollbackSqls ...string) error { - c.Assert(len(rows), Not(Equals), 0) - - inc := config.GetGlobalConfig().Inc - if s.db == nil || s.db.DB().Ping() != nil { - addr := fmt.Sprintf("%s:%s@tcp(%s:%d)/mysql?charset=utf8mb4&parseTime=True&loc=Local&maxAllowedPacket=4194304", - inc.BackupUser, inc.BackupPassword, inc.BackupHost, inc.BackupPort) - - db, err := gorm.Open("mysql", addr) - if err != nil { - fmt.Println(err) - return err - } - // 禁用日志记录器,不显示任何日志 - db.LogMode(false) - s.db = db - } - - // 有可能是 不同的表,不同的库 - - result := []string{} - - // affectedRows := 0 - // opid := "" - // backupDBName := "" - // sqlIndex := 0 - for _, row := range rows { - opid := "" - backupDBName := "" - affectedRows := 0 - if row[6] != nil { - a := row[6].(string) - affectedRows, _ = strconv.Atoi(a) - } - if row[7] != nil { - opid = row[7].(string) - } - if row[8] != nil { - backupDBName = row[8].(string) - } - currentSql := "" - if row[5] != nil { - currentSql = row[5].(string) - } - - if !strings.Contains(row[3].(string), "Backup Successfully") || strings.HasSuffix(opid, "00000000") { - continue - } - - // 获取表名(改为从语法中自动获取) - // sql := "select tablename from %s.%s where opid_time = ?" - // sql = fmt.Sprintf(sql, backupDBName, s.remoteBackupTable) - // tableName := "" - // rows, err := s.db.Raw(sql, opid).Rows() - // c.Assert(err, IsNil) - // for rows.Next() { - // rows.Scan(&tableName) - // } - // rows.Close() - - // if sqlIndex >= len(rollbackSqls) { - - // } - - tableName := s.getObjectName(currentSql) - c.Assert(tableName, Not(Equals), "", Commentf("%v", currentSql)) - - sql := "select rollback_statement from %s.`%s` where opid_time = ?;" - sql = fmt.Sprintf(sql, backupDBName, tableName) - rows, err := s.db.Raw(sql, opid).Rows() - c.Assert(err, IsNil) - str := "" - // count := 0 - - result1 := []string{} - for rows.Next() { - rows.Scan(&str) - result1 = append(result1, s.trim(str)) - // count++ - } - rows.Close() - - if affectedRows > 0 { - c.Assert(affectedRows, Equals, len(result1), Commentf("%v", result1)) - } - - result = append(result, result1...) - } - - c.Assert(len(result), Equals, len(rollbackSqls), Commentf("%v", result)) - - for i := range result { - c.Assert(result[i], Equals, rollbackSqls[i], Commentf("%v", result)) - } - - return nil -} - func (s *testSessionIncTranSuite) queryStatistics() []int { inc := config.GetGlobalConfig().Inc if s.db == nil || s.db.DB().Ping() != nil { @@ -981,101 +878,3 @@ func (s *testSessionIncTranSuite) TestStatistics(c *C) { c.Assert(v, Equals, result[i], Commentf("%v", statistics)) } } - -// getObjectName 解析操作表名 -func (s *testSessionIncTranSuite) getObjectName(sql string) (name string) { - - stmtNodes, _, _ := s.parser.Parse(sql, "utf8mb4", "utf8mb4_bin") - - for _, stmtNode := range stmtNodes { - switch node := stmtNode.(type) { - case *ast.InsertStmt: - tableRefs := node.Table - if tableRefs == nil || tableRefs.TableRefs == nil || tableRefs.TableRefs.Right != nil { - return "" - } - tblSrc, ok := tableRefs.TableRefs.Left.(*ast.TableSource) - if !ok { - return "" - } - if tblSrc.AsName.L != "" { - return "" - } - tblName, ok := tblSrc.Source.(*ast.TableName) - if !ok { - return "" - } - - name = tblName.Name.String() - - case *ast.UpdateStmt: - // name = node.Table.Name.String() - tableRefs := node.TableRefs - if tableRefs == nil || tableRefs.TableRefs == nil || tableRefs.TableRefs.Right != nil { - return "" - } - tblSrc, ok := tableRefs.TableRefs.Left.(*ast.TableSource) - if !ok { - return "" - } - if tblSrc.AsName.L != "" { - return "" - } - tblName, ok := tblSrc.Source.(*ast.TableName) - if !ok { - return "" - } - - name = tblName.Name.String() - case *ast.DeleteStmt: - // name = node.Table.Name.String() - tableRefs := node.TableRefs - if tableRefs == nil || tableRefs.TableRefs == nil || tableRefs.TableRefs.Right != nil { - return "" - } - tblSrc, ok := tableRefs.TableRefs.Left.(*ast.TableSource) - if !ok { - return "" - } - if tblSrc.AsName.L != "" { - return "" - } - tblName, ok := tblSrc.Source.(*ast.TableName) - if !ok { - return "" - } - - name = tblName.Name.String() - - case *ast.CreateDatabaseStmt, *ast.DropDatabaseStmt: - - case *ast.CreateTableStmt: - name = node.Table.Name.String() - case *ast.AlterTableStmt: - name = node.Table.Name.String() - case *ast.DropTableStmt: - for _, t := range node.Tables { - name = t.Name.String() - break - } - - case *ast.RenameTableStmt: - name = node.OldTable.Name.String() - - case *ast.TruncateTableStmt: - - name = node.Table.Name.String() - - case *ast.CreateIndexStmt: - name = node.Table.Name.String() - case *ast.DropIndexStmt: - name = node.Table.Name.String() - - default: - - } - - return name - } - return "" -}