From 479b91c9ea3cfd0bddc4cfd431b1ce32286af65a Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Sun, 28 Jun 2020 14:47:20 +0800 Subject: [PATCH] flush global checkpoint when first flush checkpoint --- syncer/checkpoint.go | 3 ++- syncer/checkpoint_test.go | 38 ++++++++++++++++++++++++++------------ syncer/syncer_test.go | 25 ++++++++++++++++++++++++- 3 files changed, 52 insertions(+), 14 deletions(-) diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index d4efd7b4da..d1717209e5 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -264,6 +264,7 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { } cp.globalPoint = newBinlogPoint(minCheckpoint, minCheckpoint) + cp.globalPointSaveTime = time.Time{} cp.points = make(map[string]map[string]*binlogPoint) @@ -372,7 +373,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl sqls := make([]string, 0, 100) args := make([][]interface{}, 0, 100) - if cp.globalPoint.outOfDate() { + if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() { posG := cp.GlobalPoint() sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, posG.Name, posG.Pos, true) sqls = append(sqls, sqlG) diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index c04ad95427..00a318309f 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -129,22 +129,10 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.cfg.Dir = oldDir }() - // try load from mydumper's output pos1 := mysql.Position{ Name: "mysql-bin.000003", Pos: 1943, } - dir, err := ioutil.TempDir("", "test_global_checkpoint") - c.Assert(err, IsNil) - defer os.RemoveAll(dir) - - filename := filepath.Join(dir, "metadata") - err = ioutil.WriteFile(filename, []byte( - fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)), - 0644) - c.Assert(err, IsNil) - s.cfg.Mode = config.ModeAll - s.cfg.Dir = dir s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil)) err = cp.Load(tctx) @@ -236,6 +224,32 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { c.Assert(err, IsNil) c.Assert(cp.GlobalPoint(), Equals, minCheckpoint) c.Assert(cp.FlushedGlobalPoint(), Equals, minCheckpoint) + + // try load from mydumper's output + dir, err := ioutil.TempDir("", "test_global_checkpoint") + c.Assert(err, IsNil) + defer os.RemoveAll(dir) + + filename := filepath.Join(dir, "metadata") + err = ioutil.WriteFile(filename, []byte( + fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)), + 0644) + c.Assert(err, IsNil) + s.cfg.Mode = config.ModeAll + s.cfg.Dir = dir + cp.LoadMeta() + + // should flush because globalPointSaveTime is zero + s.mock.ExpectBegin() + s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, true, pos1.Name, pos1.Pos).WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock.ExpectCommit() + err = cp.FlushPointsExcept(tctx, nil, nil, nil) + c.Assert(err, IsNil) + s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil)) + err = cp.Load(tctx) + c.Assert(err, IsNil) + c.Assert(cp.GlobalPoint(), Equals, pos1) + c.Assert(cp.FlushedGlobalPoint(), Equals, pos1) } func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 9055811bd0..7d53cd23cc 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -218,6 +218,14 @@ func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser. return utils.GetParser(db, false) } +func (s *testSyncerSuite) mockCheckPointMetaFirstTime(checkPointMock sqlmock.Sqlmock) { + checkPointMock.ExpectBegin() + // here we flush global checkpoint first time + checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() +} + func (s *testSyncerSuite) mockCheckPointMeta(checkPointMock sqlmock.Sqlmock) { checkPointMock.ExpectBegin() checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -1061,10 +1069,25 @@ func (s *testSyncerSuite) TestCasuality(c *C) { c.Assert(key, Equals, "b") // will detect casuality and add a flush job + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + dbConn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + + syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()) + + mock.ExpectBegin() + mock.ExpectExec(".*INSERT INTO .* VALUES.* ON DUPLICATE KEY UPDATE.*").WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() key, err = syncer.resolveCasuality([]string{"a", "b"}) c.Assert(err, IsNil) c.Assert(key, Equals, "a") + if err := mock.ExpectationsWereMet(); err != nil { + c.Errorf("checkpoint db unfulfilled expectations: %s", err) + } + wg.Wait() } @@ -1268,7 +1291,7 @@ func (s *testSyncerSuite) TestSharding(c *C) { AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) // mock checkpoint db after create db table1 table2 - s.mockCheckPointMeta(checkPointMock) + s.mockCheckPointMetaFirstTime(checkPointMock) s.mockCheckPointCreate(checkPointMock) s.mockCheckPointMeta(checkPointMock) s.mockCheckPointCreate(checkPointMock)