Skip to content

Commit

Permalink
Merge branch 'master' into openapi-config-template-dmctl
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang authored Jan 17, 2022
2 parents 69c8a71 + a6802a5 commit 5896209
Showing 1 changed file with 69 additions and 39 deletions.
108 changes: 69 additions & 39 deletions dm/dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
defer clearOptimistTestSourceInfoOperation(c)

var (
watchTimeout = 2 * time.Second
watchTimeout = 5 * time.Second
logger = log.L()
o = NewOptimist(&logger)
task = "task-test-optimist"
Expand Down Expand Up @@ -654,15 +654,20 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
// wait operation for i1 become available.
opCh := make(chan optimism.Operation, 10)
errCh := make(chan error, 10)
ctx2, cancel2 := context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i1.Task, i1.Source, i1.UpSchema, i1.UpTable, rev1, opCh, errCh)
ctx2, cancel2 := context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i1.Task, i1.Source, i1.UpSchema, i1.UpTable, rev1, opCh, errCh)
select {
case <-time.After(watchTimeout):
c.Fatal("timeout")
case op1 := <-opCh:
c.Assert(op1.DDLs, DeepEquals, DDLs1)
c.Assert(op1.ConflictStage, Equals, optimism.ConflictNone)
}

cancel2()
close(opCh)
close(errCh)
c.Assert(len(opCh), Equals, 1)
op1 := <-opCh
c.Assert(op1.DDLs, DeepEquals, DDLs1)
c.Assert(op1.ConflictStage, Equals, optimism.ConflictNone)
c.Assert(len(opCh), Equals, 0)
c.Assert(len(errCh), Equals, 0)

// PUT i2, conflict will be detected.
Expand All @@ -671,15 +676,21 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
// wait operation for i2 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i2.Task, i2.Source, i2.UpSchema, i2.UpTable, rev2, opCh, errCh)

ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i2.Task, i2.Source, i2.UpSchema, i2.UpTable, rev2, opCh, errCh)
select {
case <-time.After(watchTimeout):
c.Fatal("timeout")
case op2 := <-opCh:
c.Assert(op2.DDLs, DeepEquals, []string{})
c.Assert(op2.ConflictStage, Equals, optimism.ConflictDetected)
}

cancel2()
close(opCh)
close(errCh)
c.Assert(len(opCh), Equals, 1)
op2 := <-opCh
c.Assert(op2.DDLs, DeepEquals, []string{})
c.Assert(op2.ConflictStage, Equals, optimism.ConflictDetected)
c.Assert(len(opCh), Equals, 0)
c.Assert(len(errCh), Equals, 0)

// PUT i3, no conflict now.
Expand All @@ -689,15 +700,19 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
// wait operation for i3 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i3.Task, i3.Source, i3.UpSchema, i3.UpTable, rev3, opCh, errCh)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i3.Task, i3.Source, i3.UpSchema, i3.UpTable, rev3, opCh, errCh)
select {
case <-time.After(watchTimeout):
c.Fatal("timeout")
case op3 := <-opCh:
c.Assert(op3.DDLs, DeepEquals, []string{})
c.Assert(op3.ConflictStage, Equals, optimism.ConflictNone)
}
cancel2()
close(opCh)
close(errCh)
c.Assert(len(opCh), Equals, 1)
op3 := <-opCh
c.Assert(op3.DDLs, DeepEquals, []string{})
c.Assert(op3.ConflictStage, Equals, optimism.ConflictNone)
c.Assert(len(opCh), Equals, 0)
c.Assert(len(errCh), Equals, 0)
}

Expand All @@ -707,7 +722,7 @@ func (t *testOptimist) TestOptimistLockMultipleTarget(c *C) {
var (
backOff = 30
waitTime = 100 * time.Millisecond
watchTimeout = 2 * time.Second
watchTimeout = 5 * time.Second
logger = log.L()
o = NewOptimist(&logger)
task = "test-optimist-lock-multiple-target"
Expand Down Expand Up @@ -821,15 +836,20 @@ func (t *testOptimist) TestOptimistLockMultipleTarget(c *C) {
// wait operation for i12 become available.
opCh := make(chan optimism.Operation, 10)
errCh := make(chan error, 10)
ctx2, cancel2 := context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev1, opCh, errCh)
var op12 optimism.Operation
ctx2, cancel2 := context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev1, opCh, errCh)
select {
case <-time.After(watchTimeout):
c.Fatal("timeout")
case op12 = <-opCh:
c.Assert(op12.DDLs, DeepEquals, DDLs)
c.Assert(op12.ConflictStage, Equals, optimism.ConflictNone)
}
cancel2()
close(opCh)
close(errCh)
c.Assert(len(opCh), Equals, 1)
op12 := <-opCh
c.Assert(op12.DDLs, DeepEquals, DDLs)
c.Assert(op12.ConflictStage, Equals, optimism.ConflictNone)
c.Assert(len(opCh), Equals, 0)
c.Assert(len(errCh), Equals, 0)

// mark op11 and op12 as done, the lock should be resolved.
Expand All @@ -855,15 +875,20 @@ func (t *testOptimist) TestOptimistLockMultipleTarget(c *C) {
// wait operation for i22 become available.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i22.Task, i22.Source, i22.UpSchema, i22.UpTable, rev2, opCh, errCh)
var op22 optimism.Operation
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i22.Task, i22.Source, i22.UpSchema, i22.UpTable, rev2, opCh, errCh)
select {
case <-time.After(watchTimeout):
c.Fatal("timeout")
case op22 = <-opCh:
c.Assert(op22.DDLs, DeepEquals, DDLs)
c.Assert(op22.ConflictStage, Equals, optimism.ConflictNone)
}
cancel2()
close(opCh)
close(errCh)
c.Assert(len(opCh), Equals, 1)
op22 := <-opCh
c.Assert(op22.DDLs, DeepEquals, DDLs)
c.Assert(op22.ConflictStage, Equals, optimism.ConflictNone)
c.Assert(len(opCh), Equals, 0)
c.Assert(len(errCh), Equals, 0)

// mark op21 and op22 as done, the lock should be resolved.
Expand Down Expand Up @@ -892,7 +917,7 @@ func (t *testOptimist) TestOptimistInitSchema(c *C) {
var (
backOff = 30
waitTime = 100 * time.Millisecond
watchTimeout = 2 * time.Second
watchTimeout = 5 * time.Second
logger = log.L()
o = NewOptimist(&logger)
task = "test-optimist-init-schema"
Expand Down Expand Up @@ -954,15 +979,20 @@ func (t *testOptimist) TestOptimistInitSchema(c *C) {
// wait operation for i12 become available.
opCh := make(chan optimism.Operation, 10)
errCh := make(chan error, 10)
ctx2, cancel2 := context.WithTimeout(ctx, watchTimeout)
optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev1, opCh, errCh)
var op12 optimism.Operation
ctx2, cancel2 := context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev1, opCh, errCh)
select {
case <-time.After(watchTimeout):
c.Fatal("timeout")
case op12 = <-opCh:
c.Assert(op12.DDLs, DeepEquals, DDLs1)
c.Assert(op12.ConflictStage, Equals, optimism.ConflictNone)
}
cancel2()
close(opCh)
close(errCh)
c.Assert(len(opCh), Equals, 1)
op12 := <-opCh
c.Assert(op12.DDLs, DeepEquals, DDLs1)
c.Assert(op12.ConflictStage, Equals, optimism.ConflictNone)
c.Assert(len(opCh), Equals, 0)
c.Assert(len(errCh), Equals, 0)

// mark op11 and op12 as done, the lock should be resolved.
Expand Down

0 comments on commit 5896209

Please sign in to comment.