Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

etcd: fix json unmarshall error and sources not found after upgrade to 2.0.2 (#1635) #1649

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions .github/workflows/upgrade-via-tiup.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name: Upgrade via TiUP

on:
pull_request:
branches:
- master
- release-2.0
schedule:
- cron: '3 22 * * *' # run at minute 06:03 UTC+8
workflow_dispatch:
Expand Down Expand Up @@ -35,7 +39,7 @@ jobs:
GOPATH=${GITHUB_WORKSPACE}/go docker-compose up -d

- name: Run test cases
working-directory: ${{ env.working-directoryr }}
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
docker-compose exec -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-from-v1.sh"
Expand Down Expand Up @@ -63,7 +67,7 @@ jobs:
strategy:
fail-fast: false
matrix:
previous_v2: ["v2.0.0", "v2.0.1"]
previous_v2: ["v2.0.0", "v2.0.1", "v2.0.2"]

steps:

Expand All @@ -76,19 +80,59 @@ jobs:
uses: actions/checkout@v2
with:
path: go/src/github.com/pingcap/dm

- name: Build
if: ${{ github.ref != 'refs/heads/main' }}
working-directory: ${{ env.working-directory }}
run: make build nolint=true

- name: Package files
if: ${{ github.ref != 'refs/heads/main' }}
run: |
mkdir ${{ github.workspace }}/package
cd ${{ github.workspace }}/package

echo "package dm-master"
mkdir dm-master
cp ${{ env.working-directory }}/bin/dm-master dm-master
cp -r ${{ env.working-directory }}/dm/dm-ansible/conf dm-master
cp -r ${{ env.working-directory }}/dm/dm-ansible/scripts dm-master
tar -czvf dm-master-nightly-linux-amd64.tar.gz dm-master

echo "package dm-worker"
mkdir dm-worker
cp ${{ env.working-directory }}/bin/dm-worker dm-worker
cp -r ${{ env.working-directory }}/dm/dm-ansible/conf dm-worker/conf
cp -r ${{ env.working-directory }}/dm/dm-ansible/scripts dm-worker/scripts
tar -czvf dm-worker-nightly-linux-amd64.tar.gz dm-worker

echo "package dmctl"
mkdir dmctl
cp ${{ env.working-directory }}/bin/dmctl dmctl
cp -r ${{ env.working-directory }}/dm/dm-ansible/conf dmctl/conf
cp -r ${{ env.working-directory }}/dm/dm-ansible/scripts dmctl/scripts
tar -czvf dmctl-nightly-linux-amd64.tar.gz dmctl

- name: Setup containers
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
GOPATH=${GITHUB_WORKSPACE}/go docker-compose up -d

- name: Copy package files
if: ${{ github.ref != 'refs/heads/main' }}
run: |
cd ${{ github.workspace }}/package
docker cp dm-master-nightly-linux-amd64.tar.gz control:/tmp
docker cp dm-worker-nightly-linux-amd64.tar.gz control:/tmp
docker cp dmctl-nightly-linux-amd64.tar.gz control:/tmp

# TODO: support more CUR_VER
- name: Run test cases
working-directory: ${{ env.working-directoryr }}
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
docker-compose exec -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-from-v2.sh ${{ matrix.previous_v2 }} nightly"
docker-compose exec -e ref=${{ github.ref }} -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-from-v2.sh ${{ matrix.previous_v2 }} nightly"

# send Slack notify if failed.
# NOTE: With the exception of `GITHUB_TOKEN`, secrets are not passed to the runner when a workflow is triggered from a forked repository.
Expand Down Expand Up @@ -131,7 +175,7 @@ jobs:
GOPATH=${GITHUB_WORKSPACE}/go docker-compose up -d

- name: Run test cases before upgrade
working-directory: ${{ env.working-directoryr }}
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/tests/tiup/docker
docker-compose exec -T control bash -c "cd /go/src/github.com/pingcap/dm && ./tests/tiup/upgrade-tidb.sh before_upgrade nightly"
Expand Down
10 changes: 10 additions & 0 deletions dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ func (s *Server) bootstrap(ctx context.Context) error {
return nil
}

func (s *Server) bootstrapBeforeSchedulerStart(ctx context.Context) error {
log.L().Info("start before scheduler start")
// no need for v1.0.x
if s.cfg.V1SourcesPath != "" {
return nil
}

return upgrade.TryUpgradeBeforeSchedulerStart(ctx, s.etcdClient)
}

// importFromV10x tries to import/upgrade the cluster from v1.0.x.
func (s *Server) importFromV10x(ctx context.Context) error {
// 1. check whether need to upgrade based on the cluster version.
Expand Down
11 changes: 10 additions & 1 deletion dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func (s *Server) electionNotify(ctx context.Context) {
log.L().Info("current member become the leader", zap.String("current member", s.cfg.Name))
s.leader.Store(oneselfStartingLeader)

// try to upgrade the cluster before scheduler start
err := s.bootstrapBeforeSchedulerStart(ctx)
if err != nil {
log.L().Error("fail to bootstrap the cluster before scheduler start", zap.Error(err))
s.retireLeader()
s.election.Resign()
continue
}

// NOTE: for logic errors, we should return with `true`, so that the cluster can serve requests and the user can fix these errors.
// otherwise no member of DM-master can become the leader and the user can't fix them (the cluster may need to be fixed offline with some other tools like etcdctl).
ok := s.startLeaderComponent(ctx)
Expand All @@ -76,7 +85,7 @@ func (s *Server) electionNotify(ctx context.Context) {
// so if the old leader failed when upgrading, the new leader can try again.
// NOTE: if the cluster has been upgraded, calling this method again should have no side effects.
// NOTE: now, bootstrap relies on scheduler to handle DM-worker instances, sources, tasks, etcd.
err := s.bootstrap(ctx)
err = s.bootstrap(ctx)
if err != nil {
log.L().Error("fail to bootstrap the cluster", zap.Error(err))
s.retireLeader()
Expand Down
2 changes: 1 addition & 1 deletion dm/master/workerrpc/rawgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *GRPCClient) Close() error {
return nil
}

// Closed returns whether this grpc conn is closed. only used for test now
// Closed returns whether this grpc conn is closed. only used for test now.
func (c *GRPCClient) Closed() bool {
return c.closed.Load()
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
)

// TODO: much of the code in optimistic mode is very similar to pessimistic mode, we can try to combine them together.
Expand Down Expand Up @@ -148,6 +149,18 @@ func (i Info) toJSON() (string, error) {
// infoFromJSON constructs Info from its JSON represent.
func infoFromJSON(s string) (i Info, err error) {
err = json.Unmarshal([]byte(s), &i)
if err != nil {
// For compatibility.
// In v2.0.2, we changed struct of table-info-after but forgot to upgrade etcd value.
// To keep the ModRevision of info, we change them after getting info instead of change all the value in etcd when upgrade
// All the Info will be upgraded after new info putted or lock resolved.
oldInfo, newErr := oldInfoFromJSON(s)
if newErr != nil {
log.L().Error("unmarshal old info", log.ShortError(newErr))
return
}
return oldInfo.toInfo(), nil
}
return
}

Expand Down Expand Up @@ -294,3 +307,38 @@ func ClearTestInfoOperationSchema(cli *clientv3.Client) error {
_, err := cli.Txn(context.Background()).Then(clearSource, clearInfo, clearOp, clearISOp, clearColumns).Commit()
return err
}

// OldInfo represents info in etcd before v2.0.2.
type OldInfo struct {
Task string `json:"task"`
Source string `json:"source"`
UpSchema string `json:"up-schema"`
UpTable string `json:"up-table"`
DownSchema string `json:"down-schema"`
DownTable string `json:"down-table"`
DDLs []string `json:"ddls"`

TableInfoBefore *model.TableInfo `json:"table-info-before"` // the tracked table schema before applying the DDLs
TableInfoAfter *model.TableInfo `json:"table-info-after"` // the tracked table schema after applying the DDLs
}

// oldInfoFromJSON constructs OldInfo from its JSON represent.
func oldInfoFromJSON(s string) (oldInfo OldInfo, err error) {
err = json.Unmarshal([]byte(s), &oldInfo)
return
}

// toInfo converts OldInfo to Info.
func (oldInfo *OldInfo) toInfo() Info {
return Info{
Task: oldInfo.Task,
Source: oldInfo.Source,
UpSchema: oldInfo.UpSchema,
UpTable: oldInfo.UpTable,
DownSchema: oldInfo.DownSchema,
DownTable: oldInfo.DownTable,
DDLs: oldInfo.DDLs,
TableInfoBefore: oldInfo.TableInfoBefore,
TableInfosAfter: []*model.TableInfo{oldInfo.TableInfoAfter},
}
}
121 changes: 121 additions & 0 deletions pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package optimism

import (
"context"
"encoding/json"
"sync"
"testing"
"time"
Expand All @@ -28,6 +29,9 @@ import (
"github.com/pingcap/tidb/util/mock"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
)

var etcdTestCli *clientv3.Client
Expand Down Expand Up @@ -85,6 +89,91 @@ func (t *testForEtcd) TestInfoJSON(c *C) {
c.Assert(i2, DeepEquals, i1)
}

func (t *testForEtcd) TestEtcdInfoUpgrade(c *C) {
defer clearTestInfoOperation(c)

var (
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
task1 = "task-1"
task2 = "task-2"
upSchema = "foo_1"
upTable = "bar_1"
downSchema = "foo"
downTable = "bar"
p = parser.New()
se = mock.NewContext()
tblID int64 = 222
tblI1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tblI2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)
tblI3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`)
tblI4 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT, c3 INT)`)
i11 = NewInfo(task1, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c1 INT"}, tblI1, []*model.TableInfo{tblI2})
i12 = NewInfo(task1, source2, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c2 INT"}, tblI2, []*model.TableInfo{tblI3})
i21 = NewInfo(task2, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c3 INT"}, tblI3, []*model.TableInfo{tblI4})
oi11 = newOldInfo(task1, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c1 INT"}, tblI1, tblI2)
oi12 = newOldInfo(task1, source2, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c2 INT"}, tblI2, tblI3)
oi21 = newOldInfo(task2, source1, upSchema, upTable, downSchema, downTable, []string{"ALTER TABLE bar ADD COLUMN c3 INT"}, tblI3, tblI4)
)

// put the oldInfo
rev1, err := putOldInfo(etcdTestCli, oi11)
c.Assert(err, IsNil)
rev2, err := putOldInfo(etcdTestCli, oi11)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)

// put another key and get again with 2 info.
rev3, err := putOldInfo(etcdTestCli, oi12)
c.Assert(err, IsNil)
c.Assert(rev3, Greater, rev2)

// get all infos.
ifm, rev4, err := GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev4, Equals, rev3)
c.Assert(ifm, HasLen, 1)
c.Assert(ifm, HasKey, task1)
c.Assert(ifm[task1], HasLen, 2)
c.Assert(ifm[task1][source1], HasLen, 1)
c.Assert(ifm[task1][source1][upSchema], HasLen, 1)
c.Assert(ifm[task1][source2], HasLen, 1)
c.Assert(ifm[task1][source2][upSchema], HasLen, 1)

i11WithVer := i11
i11WithVer.Version = 2
i11WithVer.Revision = rev2
i12WithVer := i12
i12WithVer.Version = 1
i12WithVer.Revision = rev4
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12WithVer)

// start the watcher.
wch := make(chan Info, 10)
ech := make(chan error, 10)
var wg sync.WaitGroup
wg.Add(1)
watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
go func() {
defer wg.Done()
WatchInfo(watchCtx, etcdTestCli, rev4+1, wch, ech) // revision+1
}()

// put another oldInfo for a different task.
// version start from 1
// simulate v2.0.1 worker and v2.0.2 master
rev5, err := putOldInfo(etcdTestCli, oi21)
c.Assert(err, IsNil)
infoWithVer := <-wch
i21WithVer := i21
i21WithVer.Version = 1
i21WithVer.Revision = rev5
c.Assert(infoWithVer, DeepEquals, i21WithVer)
c.Assert(len(ech), Equals, 0)
}

func (t *testForEtcd) TestInfoEtcd(c *C) {
defer clearTestInfoOperation(c)

Expand Down Expand Up @@ -237,3 +326,35 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(info, DeepEquals, i12c)
c.Assert(len(ech), Equals, 0)
}

func newOldInfo(task, source, upSchema, upTable, downSchema, downTable string,
ddls []string, tableInfoBefore *model.TableInfo, tableInfoAfter *model.TableInfo) OldInfo {
return OldInfo{
Task: task,
Source: source,
UpSchema: upSchema,
UpTable: upTable,
DownSchema: downSchema,
DownTable: downTable,
DDLs: ddls,
TableInfoBefore: tableInfoBefore,
TableInfoAfter: tableInfoAfter,
}
}

func putOldInfo(cli *clientv3.Client, oldInfo OldInfo) (int64, error) {
data, err := json.Marshal(oldInfo)
if err != nil {
return 0, err
}
key := common.ShardDDLOptimismInfoKeyAdapter.Encode(oldInfo.Task, oldInfo.Source, oldInfo.UpSchema, oldInfo.UpTable)

ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Put(ctx, key, string(data))
if err != nil {
return 0, err
}
return resp.Header.Revision, nil
}
Loading