Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

auto master-slave switch #1364

Merged
merged 35 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
798a527
syncer auto switch relay subdir when master-slave switch
GMHDBJD Dec 28, 2020
03f1572
init docker file
GMHDBJD Dec 30, 2020
832e842
cache go modules and tools
GMHDBJD Dec 30, 2020
d686a34
change working directory
GMHDBJD Dec 30, 2020
e7a2fd6
rename notification
GMHDBJD Dec 30, 2020
4c561d8
upload logs
GMHDBJD Dec 30, 2020
d393779
minor fix
GMHDBJD Dec 30, 2020
811d7d2
remove env
GMHDBJD Dec 30, 2020
cc85d27
change order
GMHDBJD Dec 30, 2020
cc8ae6f
minor update
GMHDBJD Dec 30, 2020
9348a6c
add test case for upstream database switch
GMHDBJD Dec 31, 2020
7cce42c
detect fake rotate event
GMHDBJD Dec 31, 2020
d3b1f89
speed up build
GMHDBJD Dec 31, 2020
98d7ff1
wait server-id
GMHDBJD Dec 31, 2020
c03f87d
minor fix
GMHDBJD Dec 31, 2020
77990ca
ignore mysql warn
GMHDBJD Dec 31, 2020
b4d3e6e
debug
GMHDBJD Dec 31, 2020
f3e66a1
fix log dir
GMHDBJD Dec 31, 2020
0202c92
debug
GMHDBJD Dec 31, 2020
9d96a2a
fix logs
GMHDBJD Dec 31, 2020
6eb8ea0
remove password
GMHDBJD Dec 31, 2020
5f8b42d
Revert "debug"
GMHDBJD Dec 31, 2020
2562bc2
Revert "debug"
GMHDBJD Dec 31, 2020
de14021
minor fix
GMHDBJD Dec 31, 2020
306c566
check data before sync
GMHDBJD Dec 31, 2020
687919e
less data
GMHDBJD Dec 31, 2020
828572e
make sure relay switch before unpause
GMHDBJD Jan 4, 2021
2530587
address comment
GMHDBJD Jan 6, 2021
20f40cd
wait to ensure old file not updated
GMHDBJD Jan 6, 2021
f2221fa
Merge branch 'master' into switchNextDir
GMHDBJD Jan 18, 2021
5aa6ff4
Merge branch 'master' into switchNextDir
GMHDBJD Jan 22, 2021
76ddfed
Update pkg/terror/error_list.go
lance6716 Feb 2, 2021
a5a543c
Merge branch 'master' into switchNextDir
lance6716 Feb 2, 2021
04da538
Update errors.toml
lance6716 Feb 3, 2021
d74688d
Update _utils/terror_gen/errors_release.txt
lance6716 Feb 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/upgrade-via-tiup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
SLACK_WEBHOOK: ${{ secrets.SLACK_NOTIFY }}
uses: Ilshidur/action-slack@2.1.0
with:
args: "chaos job failed, see https://github.com/pingcap/dm/actions/runs/{{ GITHUB_RUN_ID }}"
args: "upgrade-via-tiup job failed, see https://github.com/pingcap/dm/actions/runs/{{ GITHUB_RUN_ID }}"

# Debug via SSH if previous steps failed
- name: Set up tmate session
Expand Down
83 changes: 83 additions & 0 deletions .github/workflows/upstream-switch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: Upstream database switch

on:
push:
branches:
- master
- release-2.0
pull_request:
branches:
- master
- release-2.0

jobs:
upstream-database-switch:
name: upstream-database-switch
runs-on: ubuntu-18.04

steps:
- name: Set up Go 1.13
uses: actions/setup-go@v2
with:
go-version: 1.13

- name: Check out code
uses: actions/checkout@v2

- name: Cache go modules
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-dm-${{ hashFiles('**/go.sum') }}

- name: Cache tools
uses: actions/cache@v2
with:
path: |
**/tools
key: ${{ runner.os }}-dm-tools-${{ hashFiles('**/tools/go.sum') }}

- name: Build DM binary
run: make dm_integration_test_build

- name: Setup containers
run: |
docker-compose -f ./tests/upstream_switch/docker-compose.yml up -d

- name: Run test cases
run: |
bash ./tests/upstream_switch/case.sh

- name: Copy logs to hack permission
if: ${{ always() }}
run: |
mkdir ./logs
sudo cp -r -L /tmp/dm_test/upstream_switch/master/log ./logs/master
sudo cp -r -L /tmp/dm_test/upstream_switch/worker1/log ./logs/worker1
sudo cp -r -L /tmp/dm_test/upstream_switch/worker2/log ./logs/worker2
sudo chown -R runner ./logs

# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
continue-on-error: true
uses: actions/upload-artifact@v2
if: ${{ always() }}
with:
name: upstream-switch-logs
path: |
./logs

# send Slack notify if failed.
# NOTE: With the exception of `GITHUB_TOKEN`, secrets are not passed to the runner when a workflow is triggered from a forked repository.
- name: Slack notification
if: ${{ failure() }}
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_NOTIFY }}
uses: Ilshidur/action-slack@2.1.0
with:
args: "upstream-switch job failed, see https://github.com/pingcap/dm/actions/runs/{{ GITHUB_RUN_ID }}"

# Debug via SSH if previous steps failed
- name: Set up tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v2
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ ErrRelayTrimUUIDNotFound,[code=30040:class=relay-unit:scope=internal:level=high]
ErrRelayRemoveFileFail,[code=30041:class=relay-unit:scope=internal:level=high], "Message: remove relay log %s %s"
ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high], "Message: args (%T) %+v not valid"
ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high], "Message: previousGTIDs %s not valid"
ErrRotateEventWithDifferentServerID,[code=30044:class=relay-unit:scope=internal:level=high], "Message: receive fake rotate event with differend server_id, Workaround: Please use `resume-relay` command if upstream database has changed"
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high], "Message: mydumper/dumpling runs with error, with output (may empty): %s"
ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file."
ErrDumpUnitGenBAList,[code=32003:class=dump-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-relay-unit-30044]
message = "receive fake rotate event with differend server_id"
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
description = ""
workaround = "Please use `resume-relay` command if upstream database has changed"
tags = ["internal", "high"]

[error.DM-dump-unit-32001]
message = "mydumper/dumpling runs with error, with output (may empty): %s"
description = ""
Expand Down
121 changes: 81 additions & 40 deletions pkg/streamer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package streamer
import (
"context"
"os"
"path"
"path/filepath"
"sort"
"strings"
Expand All @@ -42,6 +43,12 @@ const (
FileCmpBigger
)

// SwitchPath represents next binlog file path which should be switched
type SwitchPath struct {
nextUUID string
nextBinlogName string
}

// CollectAllBinlogFiles collects all valid binlog files in dir
func CollectAllBinlogFiles(dir string) ([]string, error) {
if dir == "" {
Expand Down Expand Up @@ -189,20 +196,29 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) {
// relaySubDirUpdated checks whether the relay sub directory updated
// including file changed, created, removed, etc.
func relaySubDirUpdated(ctx context.Context, watcherInterval time.Duration, dir string,
latestFilePath, latestFile string, latestFileSize int64) (string, error) {
latestFilePath, latestFile string, latestFileSize int64, updatePathCh chan string, errCh chan error) {
var err error
defer func() {
if err != nil {
errCh <- err
}
}()

// create polling watcher
watcher2 := watcher.NewWatcher()

// Add before Start
// no need to Remove, it will be closed and release when return
err := watcher2.Add(dir)
err = watcher2.Add(dir)
if err != nil {
return "", terror.ErrAddWatchForRelayLogDir.Delegate(err, dir)
err = terror.ErrAddWatchForRelayLogDir.Delegate(err, dir)
return
}

err = watcher2.Start(watcherInterval)
if err != nil {
return "", terror.ErrWatcherStart.Delegate(err, dir)
err = terror.ErrWatcherStart.Delegate(err, dir)
return
}
defer watcher2.Close()

Expand Down Expand Up @@ -269,60 +285,85 @@ func relaySubDirUpdated(ctx context.Context, watcherInterval time.Duration, dir
// try collect newer relay log file to check whether newer exists before watching
newerFiles, err := CollectBinlogFilesCmp(dir, latestFile, FileCmpBigger)
if err != nil {
return "", terror.Annotatef(err, "collect newer files from %s in dir %s", latestFile, dir)
err = terror.Annotatef(err, "collect newer files from %s in dir %s", latestFile, dir)
return
}

// check the latest relay log file whether updated when adding watching and collecting newer
cmp, err := fileSizeUpdated(latestFilePath, latestFileSize)
if err != nil {
return "", err
return
} else if cmp < 0 {
return "", terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath)
err = terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath)
return
} else if cmp > 0 {
// the latest relay log file already updated, need to parse from it again (not need to re-collect relay log files)
return latestFilePath, nil
updatePathCh <- latestFilePath
return
} else if len(newerFiles) > 0 {
// check whether newer relay log file exists
nextFilePath := filepath.Join(dir, newerFiles[0])
log.L().Info("newer relay log file is already generated, start parse from it", zap.String("new file", nextFilePath))
return nextFilePath, nil
updatePathCh <- nextFilePath
return
}

res := <-result
return res.updatePath, res.err
if res.err != nil {
err = res.err
return
}

updatePathCh <- res.updatePath
}

// needSwitchSubDir checks whether the reader need to switch to next relay sub directory
func needSwitchSubDir(relayDir, currentUUID, latestFilePath string, latestFileSize int64, UUIDs []string) (
needSwitch, needReParse bool, nextUUID string, nextBinlogName string, err error) {
nextUUID, _, err = getNextUUID(currentUUID, UUIDs)
if err != nil {
return false, false, "", "", terror.Annotatef(err, "current UUID %s, UUIDs %v", currentUUID, UUIDs)
} else if len(nextUUID) == 0 {
// no next sub dir exists, not need to switch
return false, false, "", "", nil
}
func needSwitchSubDir(ctx context.Context, relayDir, currentUUID, latestFilePath string, latestFileSize int64, switchCh chan SwitchPath, errCh chan error) {
var (
err error
nextUUID string
nextBinlogName string
uuids []string
)

ticker := time.NewTicker(watcherInterval)
defer func() {
ticker.Stop()
if err != nil {
errCh <- err
}
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// reload uuid
uuids, err = utils.ParseUUIDIndex(path.Join(relayDir, utils.UUIDIndexFilename))
if err != nil {
return
}
nextUUID, _, err = getNextUUID(currentUUID, uuids)
if err != nil {
return
}
if len(nextUUID) == 0 {
continue
}

// try get the first binlog file in next sub directory
nextBinlogName, err = getFirstBinlogName(relayDir, nextUUID)
if err != nil {
// NOTE: current we can not handle `errors.IsNotFound(err)` easily
// because creating sub directory and writing relay log file are not atomic
// so we let user to pause syncing before switching relay's master server
return false, false, "", "", err
}
// try get the first binlog file in next sub directory
nextBinlogName, err = getFirstBinlogName(relayDir, nextUUID)
if err != nil {
// because creating sub directory and writing relay log file are not atomic
// just continue to observe subdir
if terror.ErrBinlogFilesNotFound.Equal(err) {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
err = nil
continue
}
return
}

// check the latest relay log file whether updated when checking next sub directory
cmp, err := fileSizeUpdated(latestFilePath, latestFileSize)
if err != nil {
return false, false, "", "", err
} else if cmp < 0 {
return false, false, "", "", terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath)
} else if cmp > 0 {
// the latest relay log file already updated, need to parse from it again (not need to switch to sub directory)
return false, true, "", "", nil
switchCh <- SwitchPath{nextUUID, nextBinlogName}
return
}
}

// need to switch to next sub directory
return true, false, nextUUID, nextBinlogName, nil
}
Loading