Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

checkpoint: cherrypick flush global checkpoint when first time flush checkpoint #763

Merged
merged 1 commit into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
}

cp.globalPoint = newBinlogPoint(minCheckpoint, minCheckpoint)
cp.globalPointSaveTime = time.Time{}

cp.points = make(map[string]map[string]*binlogPoint)

Expand Down Expand Up @@ -372,7 +373,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl
sqls := make([]string, 0, 100)
args := make([][]interface{}, 0, 100)

if cp.globalPoint.outOfDate() {
if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() {
posG := cp.GlobalPoint()
sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, posG.Name, posG.Pos, true)
sqls = append(sqls, sqlG)
Expand Down
38 changes: 26 additions & 12 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,10 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
s.cfg.Dir = oldDir
}()

// try load from mydumper's output
pos1 := mysql.Position{
Name: "mysql-bin.000003",
Pos: 1943,
}
dir, err := ioutil.TempDir("", "test_global_checkpoint")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

filename := filepath.Join(dir, "metadata")
err = ioutil.WriteFile(filename, []byte(
fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)),
0644)
c.Assert(err, IsNil)
s.cfg.Mode = config.ModeAll
s.cfg.Dir = dir

s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx)
Expand Down Expand Up @@ -236,6 +224,32 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint(), Equals, minCheckpoint)
c.Assert(cp.FlushedGlobalPoint(), Equals, minCheckpoint)

// try load from mydumper's output
dir, err := ioutil.TempDir("", "test_global_checkpoint")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

filename := filepath.Join(dir, "metadata")
err = ioutil.WriteFile(filename, []byte(
fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)),
0644)
c.Assert(err, IsNil)
s.cfg.Mode = config.ModeAll
s.cfg.Dir = dir
cp.LoadMeta()

// should flush because globalPointSaveTime is zero
s.mock.ExpectBegin()
s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, true, pos1.Name, pos1.Pos).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint(), Equals, pos1)
c.Assert(cp.FlushedGlobalPoint(), Equals, pos1)
}

func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
Expand Down
25 changes: 24 additions & 1 deletion syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser.
return utils.GetParser(db, false)
}

func (s *testSyncerSuite) mockCheckPointMetaFirstTime(checkPointMock sqlmock.Sqlmock) {
checkPointMock.ExpectBegin()
// here we flush global checkpoint first time
checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectCommit()
}

func (s *testSyncerSuite) mockCheckPointMeta(checkPointMock sqlmock.Sqlmock) {
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1061,10 +1069,25 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
c.Assert(key, Equals, "b")

// will detect casuality and add a flush job
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
dbConn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}
syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background())

mock.ExpectBegin()
mock.ExpectExec(".*INSERT INTO .* VALUES.* ON DUPLICATE KEY UPDATE.*").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
key, err = syncer.resolveCasuality([]string{"a", "b"})
c.Assert(err, IsNil)
c.Assert(key, Equals, "a")

if err := mock.ExpectationsWereMet(); err != nil {
c.Errorf("checkpoint db unfulfilled expectations: %s", err)
}

wg.Wait()
}

Expand Down Expand Up @@ -1268,7 +1291,7 @@ func (s *testSyncerSuite) TestSharding(c *C) {
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"))

// mock checkpoint db after create db table1 table2
s.mockCheckPointMeta(checkPointMock)
s.mockCheckPointMetaFirstTime(checkPointMock)
s.mockCheckPointCreate(checkPointMock)
s.mockCheckPointMeta(checkPointMock)
s.mockCheckPointCreate(checkPointMock)
Expand Down