diff --git a/.github/workflows/upgrade-via-tiup.yml b/.github/workflows/upgrade-via-tiup.yml index 7c0f1771f9..512e08e8e0 100644 --- a/.github/workflows/upgrade-via-tiup.yml +++ b/.github/workflows/upgrade-via-tiup.yml @@ -98,7 +98,7 @@ jobs: SLACK_WEBHOOK: ${{ secrets.SLACK_NOTIFY }} uses: Ilshidur/action-slack@2.1.0 with: - args: "chaos job failed, see https://github.com/pingcap/dm/actions/runs/{{ GITHUB_RUN_ID }}" + args: "upgrade-via-tiup job failed, see https://github.com/pingcap/dm/actions/runs/{{ GITHUB_RUN_ID }}" # Debug via SSH if previous steps failed - name: Set up tmate session diff --git a/.github/workflows/upstream-switch.yml b/.github/workflows/upstream-switch.yml new file mode 100644 index 0000000000..8bebfa0253 --- /dev/null +++ b/.github/workflows/upstream-switch.yml @@ -0,0 +1,83 @@ +name: Upstream database switch + +on: + push: + branches: + - master + - release-2.0 + pull_request: + branches: + - master + - release-2.0 + +jobs: + upstream-database-switch: + name: upstream-database-switch + runs-on: ubuntu-18.04 + + steps: + - name: Set up Go 1.13 + uses: actions/setup-go@v2 + with: + go-version: 1.13 + + - name: Check out code + uses: actions/checkout@v2 + + - name: Cache go modules + uses: actions/cache@v2 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-dm-${{ hashFiles('**/go.sum') }} + + - name: Cache tools + uses: actions/cache@v2 + with: + path: | + **/tools + key: ${{ runner.os }}-dm-tools-${{ hashFiles('**/tools/go.sum') }} + + - name: Build DM binary + run: make dm_integration_test_build + + - name: Setup containers + run: | + docker-compose -f ./tests/upstream_switch/docker-compose.yml up -d + + - name: Run test cases + run: | + bash ./tests/upstream_switch/case.sh + + - name: Copy logs to hack permission + if: ${{ always() }} + run: | + mkdir ./logs + sudo cp -r -L /tmp/dm_test/upstream_switch/master/log ./logs/master + sudo cp -r -L /tmp/dm_test/upstream_switch/worker1/log ./logs/worker1 + sudo cp -r -L /tmp/dm_test/upstream_switch/worker2/log ./logs/worker2 + sudo chown -R runner ./logs + + # Update logs as artifact seems not stable, so we set `continue-on-error: true` here. + - name: Upload logs + continue-on-error: true + uses: actions/upload-artifact@v2 + if: ${{ always() }} + with: + name: upstream-switch-logs + path: | + ./logs + + # send Slack notify if failed. + # NOTE: With the exception of `GITHUB_TOKEN`, secrets are not passed to the runner when a workflow is triggered from a forked repository. + - name: Slack notification + if: ${{ failure() }} + env: + SLACK_WEBHOOK: ${{ secrets.SLACK_NOTIFY }} + uses: Ilshidur/action-slack@2.1.0 + with: + args: "upstream-switch job failed, see https://github.com/pingcap/dm/actions/runs/{{ GITHUB_RUN_ID }}" + + # Debug via SSH if previous steps failed + - name: Set up tmate session + if: ${{ failure() }} + uses: mxschmitt/action-tmate@v2 diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index ad8e66a84b..693ee630cf 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -230,6 +230,7 @@ ErrRelayTrimUUIDNotFound,[code=30040:class=relay-unit:scope=internal:level=high] ErrRelayRemoveFileFail,[code=30041:class=relay-unit:scope=internal:level=high], "Message: remove relay log %s %s" ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high], "Message: args (%T) %+v not valid" ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high], "Message: previousGTIDs %s not valid" +ErrRotateEventWithDifferentServerID,[code=30044:class=relay-unit:scope=internal:level=high], "Message: receive fake rotate event with different server_id, Workaround: Please use `resume-relay` command if upstream database has changed" ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper/dumpling runs with error, with output (may empty): %s" ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file." ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file." diff --git a/errors.toml b/errors.toml index 7d8d275cb0..971e4e18bd 100644 --- a/errors.toml +++ b/errors.toml @@ -1390,6 +1390,12 @@ description = "" workaround = "" tags = ["internal", "high"] +[error.DM-relay-unit-30044] +message = "receive fake rotate event with different server_id" +description = "" +workaround = "Please use `resume-relay` command if upstream database has changed" +tags = ["internal", "high"] + [error.DM-dump-unit-32001] message = "mydumper/dumpling runs with error, with output (may empty): %s" description = "" @@ -2955,4 +2961,3 @@ message = "" description = "" workaround = "" tags = ["not-set", "high"] - diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index 07bfec2fa9..4860dd3a17 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -16,6 +16,7 @@ package streamer import ( "context" "os" + "path" "path/filepath" "sort" "strings" @@ -42,6 +43,12 @@ const ( FileCmpBigger ) +// SwitchPath represents next binlog file path which should be switched +type SwitchPath struct { + nextUUID string + nextBinlogName string +} + // CollectAllBinlogFiles collects all valid binlog files in dir func CollectAllBinlogFiles(dir string) ([]string, error) { if dir == "" { @@ -189,20 +196,29 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { // relaySubDirUpdated checks whether the relay sub directory updated // including file changed, created, removed, etc. func relaySubDirUpdated(ctx context.Context, watcherInterval time.Duration, dir string, - latestFilePath, latestFile string, latestFileSize int64) (string, error) { + latestFilePath, latestFile string, latestFileSize int64, updatePathCh chan string, errCh chan error) { + var err error + defer func() { + if err != nil { + errCh <- err + } + }() + // create polling watcher watcher2 := watcher.NewWatcher() // Add before Start // no need to Remove, it will be closed and release when return - err := watcher2.Add(dir) + err = watcher2.Add(dir) if err != nil { - return "", terror.ErrAddWatchForRelayLogDir.Delegate(err, dir) + err = terror.ErrAddWatchForRelayLogDir.Delegate(err, dir) + return } err = watcher2.Start(watcherInterval) if err != nil { - return "", terror.ErrWatcherStart.Delegate(err, dir) + err = terror.ErrWatcherStart.Delegate(err, dir) + return } defer watcher2.Close() @@ -269,60 +285,85 @@ func relaySubDirUpdated(ctx context.Context, watcherInterval time.Duration, dir // try collect newer relay log file to check whether newer exists before watching newerFiles, err := CollectBinlogFilesCmp(dir, latestFile, FileCmpBigger) if err != nil { - return "", terror.Annotatef(err, "collect newer files from %s in dir %s", latestFile, dir) + err = terror.Annotatef(err, "collect newer files from %s in dir %s", latestFile, dir) + return } // check the latest relay log file whether updated when adding watching and collecting newer cmp, err := fileSizeUpdated(latestFilePath, latestFileSize) if err != nil { - return "", err + return } else if cmp < 0 { - return "", terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) + err = terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) + return } else if cmp > 0 { - // the latest relay log file already updated, need to parse from it again (not need to re-collect relay log files) - return latestFilePath, nil + updatePathCh <- latestFilePath + return } else if len(newerFiles) > 0 { // check whether newer relay log file exists nextFilePath := filepath.Join(dir, newerFiles[0]) log.L().Info("newer relay log file is already generated, start parse from it", zap.String("new file", nextFilePath)) - return nextFilePath, nil + updatePathCh <- nextFilePath + return } res := <-result - return res.updatePath, res.err + if res.err != nil { + err = res.err + return + } + + updatePathCh <- res.updatePath } // needSwitchSubDir checks whether the reader need to switch to next relay sub directory -func needSwitchSubDir(relayDir, currentUUID, latestFilePath string, latestFileSize int64, UUIDs []string) ( - needSwitch, needReParse bool, nextUUID string, nextBinlogName string, err error) { - nextUUID, _, err = getNextUUID(currentUUID, UUIDs) - if err != nil { - return false, false, "", "", terror.Annotatef(err, "current UUID %s, UUIDs %v", currentUUID, UUIDs) - } else if len(nextUUID) == 0 { - // no next sub dir exists, not need to switch - return false, false, "", "", nil - } +func needSwitchSubDir(ctx context.Context, relayDir, currentUUID, latestFilePath string, latestFileSize int64, switchCh chan SwitchPath, errCh chan error) { + var ( + err error + nextUUID string + nextBinlogName string + uuids []string + ) + + ticker := time.NewTicker(watcherInterval) + defer func() { + ticker.Stop() + if err != nil { + errCh <- err + } + }() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // reload uuid + uuids, err = utils.ParseUUIDIndex(path.Join(relayDir, utils.UUIDIndexFilename)) + if err != nil { + return + } + nextUUID, _, err = getNextUUID(currentUUID, uuids) + if err != nil { + return + } + if len(nextUUID) == 0 { + continue + } - // try get the first binlog file in next sub directory - nextBinlogName, err = getFirstBinlogName(relayDir, nextUUID) - if err != nil { - // NOTE: current we can not handle `errors.IsNotFound(err)` easily - // because creating sub directory and writing relay log file are not atomic - // so we let user to pause syncing before switching relay's master server - return false, false, "", "", err - } + // try get the first binlog file in next sub directory + nextBinlogName, err = getFirstBinlogName(relayDir, nextUUID) + if err != nil { + // because creating sub directory and writing relay log file are not atomic + // just continue to observe subdir + if terror.ErrBinlogFilesNotFound.Equal(err) { + err = nil + continue + } + return + } - // check the latest relay log file whether updated when checking next sub directory - cmp, err := fileSizeUpdated(latestFilePath, latestFileSize) - if err != nil { - return false, false, "", "", err - } else if cmp < 0 { - return false, false, "", "", terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) - } else if cmp > 0 { - // the latest relay log file already updated, need to parse from it again (not need to switch to sub directory) - return false, true, "", "", nil + switchCh <- SwitchPath{nextUUID, nextBinlogName} + return + } } - - // need to switch to next sub directory - return true, false, nextUUID, nextBinlogName, nil } diff --git a/pkg/streamer/file_test.go b/pkg/streamer/file_test.go index 23ae78ddee..e92ea2aae5 100644 --- a/pkg/streamer/file_test.go +++ b/pkg/streamer/file_test.go @@ -14,12 +14,13 @@ package streamer import ( + "bytes" "context" "fmt" "io/ioutil" "os" + "path" "path/filepath" - "regexp" "sync" "time" @@ -307,15 +308,19 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { data = []byte("meaningless file content") size = int64(len(data)) watcherInterval = 100 * time.Millisecond + updatePathCh = make(chan string, 1) + errCh = make(chan error, 1) ) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // a. relay log dir not exist - upNotExist, err := relaySubDirUpdated(ctx, watcherInterval, "/not-exists-directory", "/not-exists-filepath", "not-exists-file", 0) + relaySubDirUpdated(ctx, watcherInterval, "/not-exists-directory", "/not-exists-filepath", "not-exists-file", 0, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + err := <-errCh c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") - c.Assert(upNotExist, Equals, "") // create relay log dir subDir := c.MkDir() @@ -325,18 +330,22 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { } // b. relay file not found - upNotExist, err = relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0) + relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + err = <-errCh c.Assert(err, ErrorMatches, ".*not found.*") - c.Assert(upNotExist, Equals, "") // create the first relay file err = ioutil.WriteFile(relayPaths[0], nil, 0600) c.Assert(err, IsNil) // c. latest file path not exist - upNotExist, err = relaySubDirUpdated(ctx, watcherInterval, subDir, "/no-exists-filepath", relayFiles[0], 0) + relaySubDirUpdated(ctx, watcherInterval, subDir, "/no-exists-filepath", relayFiles[0], 0, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + err = <-errCh c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") - c.Assert(upNotExist, Equals, "") // 1. file increased when adding watching err = ioutil.WriteFile(relayPaths[0], data, 0600) @@ -345,8 +354,10 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { wg.Add(1) go func() { defer wg.Done() - up, err2 := relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0) - c.Assert(err2, IsNil) + relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + up := <-updatePathCh c.Assert(up, Equals, relayPaths[0]) }() wg.Wait() @@ -357,9 +368,11 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { wg.Add(1) go func() { defer wg.Done() - up, err2 := relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], size) - c.Assert(err2, ErrorMatches, ".*file size of relay log.*become smaller.*") - c.Assert(up, Equals, "") + relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], size, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + err = <-errCh + c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") }() wg.Wait() @@ -369,8 +382,10 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { wg.Add(1) go func() { defer wg.Done() - up, err2 := relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0) - c.Assert(err2, IsNil) + relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + up := <-updatePathCh c.Assert(up, Equals, relayPaths[1]) }() wg.Wait() @@ -379,8 +394,10 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { wg.Add(1) go func() { defer wg.Done() - up, err2 := relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[1], relayFiles[1], 0) - c.Assert(err2, IsNil) + relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[1], relayFiles[1], 0, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + up := <-updatePathCh c.Assert(up, Equals, relayPaths[1]) }() @@ -394,8 +411,10 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { wg.Add(1) go func() { defer wg.Done() - up, err2 := relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[1], relayFiles[1], size) - c.Assert(err2, IsNil) + relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[1], relayFiles[1], size, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + up := <-updatePathCh c.Assert(up, Equals, relayPaths[2]) }() @@ -412,8 +431,10 @@ func (t *testUtilSuite) TestRelaySubDirUpdated(c *C) { wg.Add(1) go func() { defer wg.Done() - up, err2 := relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[2], relayFiles[2], size) - c.Assert(err2, IsNil) + relaySubDirUpdated(ctx, watcherInterval, subDir, relayPaths[2], relayFiles[2], size, updatePathCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + up := <-updatePathCh c.Assert(up, Equals, relayPaths[2]) }() @@ -451,37 +472,47 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { latestFilePath string latestFileSize int64 data = []byte("binlog file data") + switchCh = make(chan SwitchPath, 1) + errCh = make(chan error, 1) ) + ctx := context.Background() + // invalid UUID in UUIDs, error UUIDs = append(UUIDs, "invalid.uuid") - needSwitch, needReParse, nextUUID, nextBinlogName, err := needSwitchSubDir( - relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) + t.writeUUIDs(c, relayDir, UUIDs) + needSwitchSubDir(ctx, relayDir, currentUUID, latestFilePath, latestFileSize, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + err := <-errCh c.Assert(err, ErrorMatches, ".*not valid.*") - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") + UUIDs = UUIDs[:len(UUIDs)-1] // remove the invalid UUID + t.writeUUIDs(c, relayDir, UUIDs) // no next UUID, no need switch - needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( - relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") + newCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + needSwitchSubDir(newCtx, relayDir, currentUUID, latestFilePath, latestFileSize, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) - // no binlog file in next sub directory, error + // no next sub directory currentUUID = UUIDs[0] - needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( - relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) + needSwitchSubDir(ctx, relayDir, currentUUID, latestFilePath, latestFileSize, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + err = <-errCh c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*(no such file or directory|The system cannot find the file specified).*", UUIDs[1])) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") + + // create next sub directory, block + err = os.Mkdir(filepath.Join(relayDir, UUIDs[1]), 0700) + c.Assert(err, IsNil) + newCtx2, cancel2 := context.WithTimeout(ctx, 5*time.Second) + defer cancel2() + needSwitchSubDir(newCtx2, relayDir, currentUUID, latestFilePath, latestFileSize, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) // create a relay log file in the next sub directory nextBinlogPath := filepath.Join(relayDir, UUIDs[1], "mysql-bin.000001") @@ -490,51 +521,28 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { err = ioutil.WriteFile(nextBinlogPath, nil, 0600) c.Assert(err, IsNil) - // latest relay log file not exists, error - latestFilePath = filepath.Join(relayDir, UUIDs[0], "mysql-bin.000001") - needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( - relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) - c.Assert(err, ErrorMatches, fmt.Sprintf( - ".*%s.*(no such file or directory|The system cannot find the path specified).*", - regexp.QuoteMeta(latestFilePath))) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") + // switch to the next + latestFileSize = int64(len(data)) + needSwitchSubDir(ctx, relayDir, currentUUID, latestFilePath, latestFileSize, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(switchCh), Equals, 1) + res := <-switchCh + c.Assert(res.nextUUID, Equals, UUIDs[1]) + c.Assert(res.nextBinlogName, Equals, filepath.Base(nextBinlogPath)) +} - // create the latest relay log file - err = os.MkdirAll(filepath.Dir(latestFilePath), 0700) - c.Assert(err, IsNil) - err = ioutil.WriteFile(latestFilePath, data, 0600) - c.Assert(err, IsNil) +func (t *testFileSuite) writeUUIDs(c *C, relayDir string, UUIDs []string) []byte { + indexPath := path.Join(relayDir, utils.UUIDIndexFilename) + var buf bytes.Buffer + for _, uuid := range UUIDs { + _, err := buf.WriteString(uuid) + c.Assert(err, IsNil) + _, err = buf.WriteString("\n") + c.Assert(err, IsNil) + } - // file size not updated, switch to the next - latestFileSize = int64(len(data)) - needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( - relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsTrue) - c.Assert(needReParse, IsFalse) - c.Assert(nextUUID, Equals, UUIDs[1]) - c.Assert(nextBinlogName, Equals, filepath.Base(nextBinlogPath)) - - // file size increased, parse it again - latestFileSize = 0 - needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( - relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsTrue) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - - // file size decreased, error - latestFileSize = int64(len(data)) + 1 - needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir( - relayDir, currentUUID, latestFilePath, latestFileSize, UUIDs) - c.Assert(err, ErrorMatches, ".*become smaller.*") - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") + // write the index file + err := ioutil.WriteFile(indexPath, buf.Bytes(), 0600) + c.Assert(err, IsNil) + return buf.Bytes() } diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index bb40760f0f..8087b23899 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -556,29 +556,64 @@ func (r *BinlogReader) parseFile( return false, false, latestPos, "", "", nil } - needSwitch, needReParse, nextUUID, nextBinlogName, err = needSwitchSubDir(r.cfg.RelayDir, currentUUID, fullPath, latestPos, r.uuids) - if err != nil { - return false, false, 0, "", "", err - } else if needReParse { - // need to re-parse the current relay log file - return false, true, latestPos, "", "", nil - } else if needSwitch { - // need to switch to next relay sub directory - return true, false, 0, nextUUID, nextBinlogName, nil - } + switchCh := make(chan SwitchPath, 1) + switchErrCh := make(chan error, 1) + updatePathCh := make(chan string, 1) + updateErrCh := make(chan error, 1) + newCtx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() - updatedPath, err := relaySubDirUpdated(ctx, watcherInterval, relayLogDir, fullPath, relayLogFile, latestPos) - if err != nil { + wg.Add(1) + go func(latestPos int64) { + defer func() { + close(switchCh) + close(switchErrCh) + wg.Done() + }() + needSwitchSubDir(newCtx, r.cfg.RelayDir, currentUUID, fullPath, latestPos, switchCh, switchErrCh) + }(latestPos) + + wg.Add(1) + go func(latestPos int64) { + defer func() { + close(updatePathCh) + close(updateErrCh) + wg.Done() + }() + relaySubDirUpdated(newCtx, watcherInterval, relayLogDir, fullPath, relayLogFile, latestPos, updatePathCh, updateErrCh) + }(latestPos) + + select { + case <-ctx.Done(): + return false, false, 0, "", "", nil + case switchResp := <-switchCh: + // wait to ensure old file not updated + pathUpdated := utils.WaitSomething(3, watcherInterval, func() bool { return len(updatePathCh) > 0 }) + if pathUpdated { + // re-parse it + return false, true, latestPos, "", "", nil + } + // update new uuid + if err = r.updateUUIDs(); err != nil { + return false, false, 0, "", "", nil + } + return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, nil + case updatePath := <-updatePathCh: + if strings.HasSuffix(updatePath, relayLogFile) { + // current relay log file updated, need to re-parse it + return false, true, latestPos, "", "", nil + } + // need parse next relay log file or re-collect files + return false, false, latestPos, "", "", nil + case err := <-switchErrCh: + return false, false, 0, "", "", err + case err := <-updateErrCh: return false, false, 0, "", "", err } - - if strings.HasSuffix(updatedPath, relayLogFile) { - // current relay log file updated, need to re-parse it - return false, true, latestPos, "", "", nil - } - - // need parse next relay log file or re-collect files - return false, false, latestPos, "", "", nil } // updateUUIDs re-parses UUID index file and updates UUID list diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 863f471359..8b4ffd417d 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -281,12 +281,12 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { c.Assert(err, IsNil) } - // no valid update for relay sub dir, timeout + // no valid update for relay sub dir, timeout, no error ctx1, cancel1 := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel1() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) - c.Assert(errors.Cause(err), Equals, ctx1.Err()) + c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) @@ -374,6 +374,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { // invalid UUID in UUID list, error r.uuids = []string{currentUUID, "invalid.uuid"} + t.writeUUIDs(c, baseDir, r.uuids) ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel1() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( @@ -388,6 +389,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { // next sub dir exits, need to switch r.uuids = []string{currentUUID, switchedUUID} + t.writeUUIDs(c, baseDir, r.uuids) err = os.MkdirAll(nextRelayDir, 0700) c.Assert(err, IsNil) err = ioutil.WriteFile(nextFullPath, replication.BinLogFileHeader, 0600) @@ -433,12 +435,12 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { c.Assert(err, IsNil) defer f.Close() - // file has no data, meet io.EOF error (when reading file header) and ignore it. but will get `context deadline exceeded` error + // file has no data, meet io.EOF error (when reading file header) and ignore it. ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel1() needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) - c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) + c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) @@ -1159,3 +1161,19 @@ func (t *testReaderSuite) uuidListToBytes(c *C, UUIDs []string) []byte { } return buf.Bytes() } + +func (t *testReaderSuite) writeUUIDs(c *C, relayDir string, UUIDs []string) []byte { + indexPath := path.Join(relayDir, utils.UUIDIndexFilename) + var buf bytes.Buffer + for _, uuid := range UUIDs { + _, err := buf.WriteString(uuid) + c.Assert(err, IsNil) + _, err = buf.WriteString("\n") + c.Assert(err, IsNil) + } + + // write the index file + err := ioutil.WriteFile(indexPath, buf.Bytes(), 0600) + c.Assert(err, IsNil) + return buf.Bytes() +} diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index f61769349b..c779db8318 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -311,6 +311,7 @@ const ( codeRelayRemoveFileFail codeRelayPurgeArgsNotValid codePreviousGTIDsNotValid + codeRotateEventWithDifferentServerID ) // Dump unit error code @@ -899,6 +900,7 @@ var ( ErrRelayRemoveFileFail = New(codeRelayRemoveFileFail, ClassRelayUnit, ScopeInternal, LevelHigh, "remove relay log %s %s", "") ErrRelayPurgeArgsNotValid = New(codeRelayPurgeArgsNotValid, ClassRelayUnit, ScopeInternal, LevelHigh, "args (%T) %+v not valid", "") ErrPreviousGTIDsNotValid = New(codePreviousGTIDsNotValid, ClassRelayUnit, ScopeInternal, LevelHigh, "previousGTIDs %s not valid", "") + ErrRotateEventWithDifferentServerID = New(codeRotateEventWithDifferentServerID, ClassRelayUnit, ScopeInternal, LevelHigh, "receive fake rotate event with different server_id", "Please use `resume-relay` command if upstream database has changed") // Dump unit error ErrDumpUnitRuntime = New(codeDumpUnitRuntime, ClassDumpUnit, ScopeInternal, LevelHigh, "mydumper/dumpling runs with error, with output (may empty): %s", "") diff --git a/relay/relay.go b/relay/relay.go index 12b1fd19cf..7608398694 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -452,6 +452,19 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo continue } + // fake rotate event + if _, ok := e.Event.(*replication.RotateEvent); ok && e.Header.Timestamp == 0 && e.Header.LogPos == 0 { + isNew, err2 := isNewServer(ctx, r.meta.UUID(), r.db, r.cfg.Flavor) + if err2 != nil { + return err2 + } + // upstream database switch + // report an error, let outer logic handle it + if isNew { + return terror.ErrRotateEventWithDifferentServerID.Generate() + } + } + // 3. save events into file writeTimer := time.Now() r.logger.Debug("writing binlog event", zap.Reflect("header", e.Header)) @@ -588,6 +601,7 @@ func (r *Relay) reSetupMeta(ctx context.Context) error { r.logger.Info("adjusted meta to start pos", zap.Reflect("start pos", pos), zap.Stringer("start pos's binlog gtid", gtid)) r.updateMetricsRelaySubDirIndex() + r.logger.Info("resetup meta", zap.String("uuid", uuid)) return nil } diff --git a/tests/upstream_switch/Dockerfile b/tests/upstream_switch/Dockerfile new file mode 100644 index 0000000000..a6354586ce --- /dev/null +++ b/tests/upstream_switch/Dockerfile @@ -0,0 +1,18 @@ +ARG version +From mysql:$version + +RUN apt-get update && \ + apt-get -y install keepalived net-tools + +ADD init.sh /init.sh +ADD chk_mysql.sh /chk_mysql.sh + +ARG conf +ADD $conf/keepalive.conf /etc/keepalived/keepalived.conf + +RUN chmod +x /init.sh +RUN chmod +x /chk_mysql.sh + +ENTRYPOINT ["/init.sh"] + +CMD ["mysqld"] diff --git a/tests/upstream_switch/case.sh b/tests/upstream_switch/case.sh new file mode 100644 index 0000000000..411e7f54f0 --- /dev/null +++ b/tests/upstream_switch/case.sh @@ -0,0 +1,212 @@ +#!/bin/bash + +set -eu + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +PATH=$CUR/../_utils:$PATH # for sync_diff_inspector + +source $CUR/lib.sh + +function clean_data() { + echo "-------clean_data--------" + + exec_sql $master_57_host "stop slave;" + exec_sql $slave_57_host "stop slave;" + exec_sql $master_8_host "stop slave;" + exec_sql $slave_8_host "stop slave;" + + exec_sql $master_57_host "drop database if exists db1;" + exec_sql $master_57_host "drop database if exists db2;" + exec_sql $master_57_host "drop database if exists ${db};" + exec_sql $master_57_host "reset master;" + exec_sql $slave_57_host "drop database if exists db1;" + exec_sql $slave_57_host "drop database if exists db2;" + exec_sql $slave_57_host "drop database if exists ${db};" + exec_sql $slave_57_host "reset master;" + exec_sql $master_8_host "drop database if exists db1;" + exec_sql $master_8_host "drop database if exists db2;" + exec_sql $master_8_host "drop database if exists ${db};" + exec_sql $master_8_host "reset master;" + exec_sql $slave_8_host "drop database if exists db1;" + exec_sql $slave_8_host "drop database if exists db2;" + exec_sql $slave_8_host "drop database if exists ${db};" + exec_sql $slave_8_host "reset master;" + exec_tidb $tidb_host "drop database if exists db1;" + exec_tidb $tidb_host "drop database if exists db2;" + exec_tidb $tidb_host "drop database if exists ${db};" +} + +function prepare_binlogs() { + echo "-------prepare_binlogs--------" + + prepare_more_binlogs $master_57_host + prepare_less_binlogs $slave_57_host + prepare_less_binlogs $master_8_host + prepare_more_binlogs $slave_8_host +} + +function setup_replica() { + echo "-------setup_replica--------" + + master_57_status=($(get_master_status $master_57_host)) + slave_57_status=($(get_master_status $slave_57_host)) + master_8_status=($(get_master_status $master_8_host)) + slave_8_status=($(get_master_status $slave_8_host)) + echo "master_57_status" ${master_57_status[@]} + echo "slave_57_status" ${slave_57_status[@]} + echo "master_8_status" ${master_8_status[@]} + echo "slave_8_status" ${slave_8_status[@]} + + # master <-- slave + change_master_to_pos $slave_57_host $master_57_host ${master_57_status[0]} ${master_57_status[1]} + + # master <--> master + change_master_to_gtid $slave_8_host $master_8_host + change_master_to_gtid $master_8_host $slave_8_host +} + +function run_dm_components() { + echo "-------run_dm_components--------" + + pkill -9 dm-master || true + pkill -9 dm-worker || true + + run_dm_master $WORK_DIR/master $MASTER_PORT $CUR/conf/dm-master.toml + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member" \ + "alive" 1 + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $CUR/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $CUR/conf/dm-worker2.toml + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member" \ + "alive" 1 \ + "free" 2 +} + +function create_sources() { + echo "-------create_sources--------" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source create $CUR/conf/source1.yaml" \ + "\"result\": true" 2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source create $CUR/conf/source2.yaml" \ + "\"result\": true" 2 +} + +function gen_full_data() { + echo "-------gen_full_data--------" + + exec_sql $host1 "create database ${db};" + exec_sql $host1 "create table ${db}.${tb}(id int primary key, a int);" + for i in $(seq 1 100); do + exec_sql $host1 "insert into ${db}.${tb} values($i,$i);" + done + + exec_sql $host2 "create database ${db};" + exec_sql $host2 "create table ${db}.${tb}(id int primary key, a int);" + for i in $(seq 101 200); do + exec_sql $host2 "insert into ${db}.${tb} values($i,$i);" + done +} + + +function gen_incr_data() { + echo "-------gen_incr_data--------" + + for i in $(seq 201 250); do + exec_sql $host1 "insert into ${db}.${tb} values($i,$i);" + done + for i in $(seq 251 300); do + exec_sql $host2 "insert into ${db}.${tb} values($i,$i);" + done + exec_sql $host1 "alter table ${db}.${tb} add column b int;" + exec_sql $host2 "alter table ${db}.${tb} add column b int;" + for i in $(seq 301 350); do + exec_sql $host1 "insert into ${db}.${tb} values($i,$i,$i);" + done + for i in $(seq 351 400); do + exec_sql $host2 "insert into ${db}.${tb} values($i,$i,$i);" + done + + docker-compose -f $CUR/docker-compose.yml pause mysql57_master + docker-compose -f $CUR/docker-compose.yml pause mysql8_master + wait_mysql $host1 2 + wait_mysql $host2 2 + + for i in $(seq 401 450); do + exec_sql $host1 "insert into ${db}.${tb} values($i,$i,$i);" + done + for i in $(seq 451 500); do + exec_sql $host2 "insert into ${db}.${tb} values($i,$i,$i);" + done + exec_sql $host1 "alter table ${db}.${tb} add column c int;" + exec_sql $host2 "alter table ${db}.${tb} add column c int;" + + # make sure relay switch before unpause + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status task_pessimistic -s mysql-replica-02" \ + "relaySubDir.*000002" 1 + + docker-compose -f $CUR/docker-compose.yml unpause mysql8_master + wait_mysql $host2 1 + + for i in $(seq 501 550); do + exec_sql $host1 "insert into ${db}.${tb} values($i,$i,$i,$i);" + done + for i in $(seq 551 600); do + exec_sql $host2 "insert into ${db}.${tb} values($i,$i,$i,$i);" + done +} + +function start_task() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $CUR/conf/task-pessimistic.yaml --remove-meta" \ + "\result\": true" 3 +} + +function verify_result() { + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status task_pessimistic -s mysql-replica-02" \ + "relaySubDir.*000003" 1 +} + +function clean_task() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task task_pessimistic" \ + "\result\": true" 3 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "pause-relay -s mysql-replica-02" \ + "\result\": true" 2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source stop mysql-replica-01" \ + "\result\": true" 2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source stop mysql-replica-02" \ + "\result\": true" 2 + docker-compose -f $CUR/docker-compose.yml unpause mysql57_master +} + +function check_master() { + wait_mysql $host1 1 + wait_mysql $host2 1 +} + +function test() { + check_master + install_sync_diff + clean_data + prepare_binlogs + setup_replica + gen_full_data + run_dm_components + create_sources + start_task + gen_incr_data + verify_result + clean_task +} + +test diff --git a/tests/upstream_switch/chk_mysql.sh b/tests/upstream_switch/chk_mysql.sh new file mode 100644 index 0000000000..8252daa17e --- /dev/null +++ b/tests/upstream_switch/chk_mysql.sh @@ -0,0 +1,8 @@ +#!/bin/bash +counter=$(netstat -na|grep "LISTEN"|grep "3306"|wc -l) +if [ "${counter}" -eq 0 ] +then + /etc/init.d/keepalived stop +else + exit 0 +fi \ No newline at end of file diff --git a/tests/upstream_switch/conf/diff_config.toml b/tests/upstream_switch/conf/diff_config.toml new file mode 100644 index 0000000000..2be7b1eda4 --- /dev/null +++ b/tests/upstream_switch/conf/diff_config.toml @@ -0,0 +1,41 @@ +use-checksum = true + +# tables need to check. +[[check-tables]] +schema = "db_pessimistic" +tables = ["tb"] + +[[table-config]] +schema = "db_pessimistic" +table = "tb" +is-sharding = true + +[[table-config.source-tables]] +instance-id = "mysql-replica-01" +schema = "db_pessimistic" +table = "~t.*" + +[[table-config.source-tables]] +instance-id = "mysql-replica-02" +schema = "db_pessimistic" +table = "~t.*" + +[[source-db]] +host = "172.28.128.2" +port = 3306 +user = "root" +password = "123456" +instance-id = "mysql-replica-01" + +[[source-db]] +host = "172.28.128.3" +port = 3306 +user = "root" +password = "123456" +instance-id = "mysql-replica-02" + +[target-db] +host = "172.28.128.8" +port = 4000 +user = "root" +password = "" diff --git a/tests/upstream_switch/conf/dm-master.toml b/tests/upstream_switch/conf/dm-master.toml new file mode 100644 index 0000000000..aa24d7d7d3 --- /dev/null +++ b/tests/upstream_switch/conf/dm-master.toml @@ -0,0 +1,6 @@ +# Master Configuration. +name = "master1" +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" +peer-urls = "127.0.0.1:8291" +initial-cluster = "master1=http://127.0.0.1:8291" diff --git a/tests/upstream_switch/conf/dm-worker1.toml b/tests/upstream_switch/conf/dm-worker1.toml new file mode 100644 index 0000000000..6f1d1b5344 --- /dev/null +++ b/tests/upstream_switch/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" \ No newline at end of file diff --git a/tests/upstream_switch/conf/dm-worker2.toml b/tests/upstream_switch/conf/dm-worker2.toml new file mode 100644 index 0000000000..8394916268 --- /dev/null +++ b/tests/upstream_switch/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" \ No newline at end of file diff --git a/tests/upstream_switch/conf/source1.yaml b/tests/upstream_switch/conf/source1.yaml new file mode 100644 index 0000000000..dba6304460 --- /dev/null +++ b/tests/upstream_switch/conf/source1.yaml @@ -0,0 +1,9 @@ +source-id: "mysql-replica-01" +enable-gtid: true +enable-relay: false + +from: + host: "172.28.128.2" + user: "root" + password: "123456" + port: 3306 diff --git a/tests/upstream_switch/conf/source2.yaml b/tests/upstream_switch/conf/source2.yaml new file mode 100644 index 0000000000..6d30af5b67 --- /dev/null +++ b/tests/upstream_switch/conf/source2.yaml @@ -0,0 +1,9 @@ +source-id: "mysql-replica-02" +enable-gtid: true +enable-relay: true + +from: + host: "172.28.128.3" + user: "root" + password: "123456" + port: 3306 diff --git a/tests/upstream_switch/conf/task-pessimistic.yaml b/tests/upstream_switch/conf/task-pessimistic.yaml new file mode 100644 index 0000000000..ac1df0437a --- /dev/null +++ b/tests/upstream_switch/conf/task-pessimistic.yaml @@ -0,0 +1,28 @@ +--- +name: "task_pessimistic" +task-mode: all +shard-mode: pessimistic + +target-database: + host: "172.28.128.8" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - + source-id: "mysql-replica-01" + black-white-list: "instance" + mydumper-thread: 4 + loader-thread: 16 + syncer-thread: 16 + - + source-id: "mysql-replica-02" + black-white-list: "instance" + mydumper-thread: 4 + loader-thread: 16 + syncer-thread: 16 + +black-white-list: + instance: + do-dbs: ["db_pessimistic"] diff --git a/tests/upstream_switch/docker-compose.yml b/tests/upstream_switch/docker-compose.yml new file mode 100644 index 0000000000..7a627d1a61 --- /dev/null +++ b/tests/upstream_switch/docker-compose.yml @@ -0,0 +1,80 @@ +version: "3.7" + +networks: + db-networks: + ipam: # for static IP + config: + - subnet: "172.28.128.0/24" + +# re-usable extension fields, https://docs.docker.com/compose/compose-file/#extension-fields. +x-node: &default-node + privileged: true + restart: always + environment: + MYSQL_ROOT_PASSWORD: "123456" + +services: + mysql57_master: + <<: *default-node + build: + context: . + args: + conf: mysql57_master + version: 5.7 + command: --default-authentication-plugin=mysql_native_password --log-bin=/var/lib/mysql/mysql-bin --log_slave_updates=ON --server-id=1 --binlog-format=ROW --gtid_mode=ON --enforce-gtid-consistency=true + container_name: mysql57_master + hostname: mysql57_master + networks: + db-networks: + ipv4_address: 172.28.128.4 + + mysql57_slave: + <<: *default-node + build: + context: . + args: + conf: mysql57_slave + version: 5.7 + command: --default-authentication-plugin=mysql_native_password --log-bin=/var/lib/mysql/mysql-bin --log_slave_updates=ON --server-id=2 --binlog-format=ROW --gtid_mode=ON --enforce-gtid-consistency=true + container_name: mysql57_slave + hostname: mysql57_slave + networks: + db-networks: + ipv4_address: 172.28.128.5 + + mysql8_master: + <<: *default-node + build: + context: . + args: + conf: mysql8_master + version: 8 + command: --default-authentication-plugin=mysql_native_password --log-bin=/var/lib/mysql/mysql-bin --log_slave_updates=ON --server-id=1 --binlog-format=ROW --gtid_mode=ON --enforce-gtid-consistency=true + container_name: mysql8_master + hostname: mysql8_master + networks: + db-networks: + ipv4_address: 172.28.128.6 + + mysql8_slave: + <<: *default-node + build: + context: . + args: + conf: mysql8_slave + version: 8 + command: --default-authentication-plugin=mysql_native_password --log-bin=/var/lib/mysql/mysql-bin --log_slave_updates=ON --server-id=2 --binlog-format=ROW --gtid_mode=ON --enforce-gtid-consistency=true + container_name: mysql8_slave + hostname: mysql8_slave + networks: + db-networks: + ipv4_address: 172.28.128.7 + + tidb: # downstream TiDB + container_name: tidb + hostname: tidb + networks: + db-networks: + ipv4_address: 172.28.128.8 + image: pingcap/tidb:latest + restart: always \ No newline at end of file diff --git a/tests/upstream_switch/init.sh b/tests/upstream_switch/init.sh new file mode 100644 index 0000000000..a4afb2f57d --- /dev/null +++ b/tests/upstream_switch/init.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +# call the MySQL image provided entrypoint script +# "$@" is to pass all parameters as they are provided +/entrypoint.sh "$@" & + +i=0 +while [ "$(netstat -na|grep "LISTEN"|grep "3306"|wc -l)" -eq 0 ]; do + echo "wait mysql" + i=$((i+1)) + if [ "$i" -gt 100 ]; then + echo "wait for mysql timeout" + exit 1 + fi + sleep 1 +done + +service keepalived start + +tail -f /dev/null \ No newline at end of file diff --git a/tests/upstream_switch/lib.sh b/tests/upstream_switch/lib.sh new file mode 100644 index 0000000000..fe76e74820 --- /dev/null +++ b/tests/upstream_switch/lib.sh @@ -0,0 +1,102 @@ +#!/bin/bash + +set -eu + +export TEST_DIR=/tmp/dm_test +export TEST_NAME="upstream_switch" + +WORK_DIR=$TEST_DIR/$TEST_NAME +rm -rf $WORK_DIR +mkdir -p $WORK_DIR + +db="db_pessimistic" +tb="tb" +host1="172.28.128.2" +host2="172.28.128.3" +master_57_host="172.28.128.4" +slave_57_host="172.28.128.5" +master_8_host="172.28.128.6" +slave_8_host="172.28.128.7" +tidb_host="172.28.128.8" +MASTER_PORT=8261 +WORKER1_PORT=8262 +WORKER2_PORT=8263 + +function exec_sql() { + echo $2 | MYSQL_PWD=123456 mysql -uroot -h$1 -P3306 +} + +function exec_tidb() { + echo $2 | mysql -uroot -h$1 -P4000 +} + +function install_sync_diff() { + curl http://download.pingcap.org/tidb-enterprise-tools-latest-linux-amd64.tar.gz | tar xz + mkdir -p bin + mv tidb-enterprise-tools-latest-linux-amd64/bin/sync_diff_inspector bin/ +} + + +function prepare_more_binlogs() { + exec_sql $1 "create database db1;" + exec_sql $1 "flush logs;" + exec_sql $1 "create table db1.tb1(id int);" + exec_sql $1 "flush logs;" + exec_sql $1 "insert into db1.tb1 values(1);" + exec_sql $1 "insert into db1.tb1 values(2),(3),(4);" +} + +function prepare_less_binlogs() { + exec_sql $1 "create database db2;" + exec_sql $1 "flush logs;" + exec_sql $1 "create table db2.tb2(id int);" + exec_sql $1 "insert into db2.tb2 values(1);" + exec_sql $1 "insert into db2.tb2 values(2),(3);" +} + + +function get_master_status() { + arr=$(echo "show master status;" | MYSQL_PWD=123456 mysql -uroot -h$1 -P3306 | awk 'NR==2' ) + echo $arr +} + +function change_master_to_pos() { + exec_sql $1 "stop slave;" + echo "change master to master_host='$2',master_user='root',master_password='123456',master_log_file='$3',master_log_pos=$4;" + exec_sql $1 "change master to master_host='$2',master_user='root',master_password='123456',master_log_file='$3',master_log_pos=$4;" + exec_sql $1 "start slave;" +} + +function change_master_to_gtid() { + exec_sql $1 "stop slave;" + exec_sql $1 "change master to master_host='$2',master_user='root',master_password='123456',master_auto_position=1;" + exec_sql $1 "start slave;" +} + +function wait_mysql() { + echo "-------wait_mysql--------" + + i=0 + while ! mysqladmin -h$1 -P3306 -uroot ping --connect-timeout=1 > /dev/null 2>&1 ; do + echo "wait mysql" + i=$((i+1)) + if [ "$i" -gt 20 ]; then + echo "wait for mysql $1:3306 timeout" + exit 1 + fi + sleep 1 + done + i=0 + + server_id=$(echo "show variables like 'server_id';" | MYSQL_PWD=123456 mysql -uroot -h$1 -P3306 | awk 'NR==2' | awk '{print $2}') + while [ "$server_id" != $2 ]; do + echo "wait server_id" + i=$((i+1)) + if [ "$i" -gt 20 ]; then + echo "different server_id: $server_id, expect: $2, host: $1" + exit 1 + fi + sleep 1 + server_id=$(echo "show variables like 'server_id';" | MYSQL_PWD=123456 mysql -uroot -h$1 -P3306 | awk 'NR==2' | awk '{print $2}') + done +} diff --git a/tests/upstream_switch/mysql57_master/keepalive.conf b/tests/upstream_switch/mysql57_master/keepalive.conf new file mode 100644 index 0000000000..1ee9c9da12 --- /dev/null +++ b/tests/upstream_switch/mysql57_master/keepalive.conf @@ -0,0 +1,25 @@ +vrrp_script chk_mysql_port { + script "/chk_mysql.sh" + interval 2 +} + +vrrp_instance VI_1 { + state MASTER + interface eth0 + virtual_router_id 33 + priority 200 + advert_int 1 + + authentication { + auth_type PASS + auth_pass 123456 + } + + virtual_ipaddress { + 172.28.128.2/24 dev eth0 + } + + track_script { + chk_mysql_port + } +} \ No newline at end of file diff --git a/tests/upstream_switch/mysql57_slave/keepalive.conf b/tests/upstream_switch/mysql57_slave/keepalive.conf new file mode 100644 index 0000000000..ed04f20f8b --- /dev/null +++ b/tests/upstream_switch/mysql57_slave/keepalive.conf @@ -0,0 +1,25 @@ +vrrp_script chk_mysql_port { + script "/chk_mysql.sh" + interval 2 +} + +vrrp_instance VI_1 { + state BACKUP + interface eth0 + virtual_router_id 33 + priority 100 + advert_int 1 + + authentication { + auth_type PASS + auth_pass 123456 + } + + virtual_ipaddress { + 172.28.128.2/24 dev eth0 + } + + track_script { + chk_mysql_port + } +} \ No newline at end of file diff --git a/tests/upstream_switch/mysql8_master/keepalive.conf b/tests/upstream_switch/mysql8_master/keepalive.conf new file mode 100644 index 0000000000..e9bac63202 --- /dev/null +++ b/tests/upstream_switch/mysql8_master/keepalive.conf @@ -0,0 +1,25 @@ +vrrp_script chk_mysql_port { + script "/chk_mysql.sh" + interval 2 +} + +vrrp_instance VI_1 { + state MASTER + interface eth0 + virtual_router_id 33 + priority 200 + advert_int 1 + + authentication { + auth_type PASS + auth_pass 123456 + } + + virtual_ipaddress { + 172.28.128.3/24 dev eth0 + } + + track_script { + chk_mysql_port + } +} \ No newline at end of file diff --git a/tests/upstream_switch/mysql8_slave/keepalive.conf b/tests/upstream_switch/mysql8_slave/keepalive.conf new file mode 100644 index 0000000000..22a6e2e179 --- /dev/null +++ b/tests/upstream_switch/mysql8_slave/keepalive.conf @@ -0,0 +1,25 @@ +vrrp_script chk_mysql_port { + script "/chk_mysql.sh" + interval 2 +} + +vrrp_instance VI_1 { + state BACKUP + interface eth0 + virtual_router_id 33 + priority 100 + advert_int 1 + + authentication { + auth_type PASS + auth_pass 123456 + } + + virtual_ipaddress { + 172.28.128.3/24 dev eth0 + } + + track_script { + chk_mysql_port + } +} \ No newline at end of file